Compare commits
2 commits
1b50452647
...
d539731705
Author | SHA1 | Date | |
---|---|---|---|
Artemis Tosini | d539731705 | ||
Artemis Tosini | 0503e4ecbf |
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1873,6 +1873,7 @@ dependencies = [
|
|||
"reqwest",
|
||||
"rsa",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"vapore-proto",
|
||||
]
|
||||
|
|
|
@ -5,22 +5,23 @@ version.workspace = true
|
|||
|
||||
[dependencies]
|
||||
async-tungstenite = { version = "0.27.0", features = ["tokio-rustls-native-certs"] }
|
||||
base64 = "0.22.1"
|
||||
color-eyre.workspace = true
|
||||
dialoguer = "0.11.0"
|
||||
flate2 = "1.0.33"
|
||||
futures = "0.3.30"
|
||||
hex = "0.4.3"
|
||||
keyvalues-serde.workspace = true
|
||||
log.workspace = true
|
||||
protobuf.workspace = true
|
||||
qrcode = "0.14.1"
|
||||
rand = "0.8.5"
|
||||
reqwest = { version = "0.12", features = ["rustls-tls-native-roots"], default-features = false}
|
||||
rsa = "0.9.6"
|
||||
serde = { version = "1.0.209", features = ["derive"] }
|
||||
thiserror = "1.0.63"
|
||||
tokio = { version = "1.39", features = ["rt", "rt-multi-thread", "macros", "time"]}
|
||||
vapore-proto.path = "../proto"
|
||||
|
||||
[dev-dependencies]
|
||||
base64 = "0.22.1"
|
||||
dialoguer = "0.11.0"
|
||||
env_logger.workspace = true
|
||||
hex = "0.4.3"
|
||||
qrcode = "0.14.1"
|
||||
|
|
|
@ -19,8 +19,6 @@ pub async fn main() -> eyre::Result<()> {
|
|||
env_logger::init();
|
||||
color_eyre::install()?;
|
||||
|
||||
log::info!("Starting vapored");
|
||||
|
||||
let servers = vapore::connection::bootstrap_find_servers().await?;
|
||||
log::debug!("Found servers: {:?}", servers);
|
||||
|
||||
|
|
|
@ -27,8 +27,6 @@ pub async fn main() -> eyre::Result<()> {
|
|||
env_logger::init();
|
||||
color_eyre::install()?;
|
||||
|
||||
log::info!("Starting vapored");
|
||||
|
||||
let servers = vapore::connection::bootstrap_find_servers().await?;
|
||||
log::debug!("Found servers: {:?}", servers);
|
||||
|
||||
|
|
|
@ -12,8 +12,6 @@ use async_tungstenite::{
|
|||
tokio::{connect_async, ConnectStream},
|
||||
tungstenite, WebSocketStream,
|
||||
};
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use color_eyre::eyre::{self, bail, OptionExt};
|
||||
use futures::{SinkExt as _, StreamExt};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::broadcast;
|
||||
|
@ -22,13 +20,16 @@ use vapore_proto::{
|
|||
steammessages_clientserver_login::CMsgClientHeartBeat,
|
||||
};
|
||||
|
||||
use crate::message::{CMProtoBufMessage, CMRawProtoBufMessage};
|
||||
use crate::{
|
||||
message::{CMProtoBufMessage, CMRawProtoBufMessage},
|
||||
ClientError,
|
||||
};
|
||||
|
||||
/// Maximum number of messages in the buffer for by-message-type subscriptions
|
||||
const CHANNEL_CAPACITY: usize = 16;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct CMServerEntry<'a> {
|
||||
pub struct CMServerEntry<'a> {
|
||||
endpoint: &'a str,
|
||||
legacy_endpoint: &'a str,
|
||||
#[serde(rename = "type")]
|
||||
|
@ -40,13 +41,13 @@ struct CMServerEntry<'a> {
|
|||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct GetCMListForConnectResponse<'a> {
|
||||
pub struct GetCMListForConnectResponse<'a> {
|
||||
serverlist: BTreeMap<u32, CMServerEntry<'a>>,
|
||||
success: u32,
|
||||
message: &'a str,
|
||||
}
|
||||
|
||||
pub async fn bootstrap_find_servers() -> eyre::Result<Vec<String>> {
|
||||
pub async fn bootstrap_find_servers() -> Result<Vec<String>, ClientError> {
|
||||
let response = reqwest::get(
|
||||
"https://api.steampowered.com/ISteamDirectory/GetCMListForConnect/v1/?cellid=0&format=vdf",
|
||||
)
|
||||
|
@ -56,11 +57,10 @@ pub async fn bootstrap_find_servers() -> eyre::Result<Vec<String>> {
|
|||
let result: GetCMListForConnectResponse = keyvalues_serde::from_str(&response)?;
|
||||
|
||||
if result.success != 1 {
|
||||
eyre::bail!(
|
||||
"GetCMList returned bad result {} wtih message {}",
|
||||
return Err(ClientError::EResult(
|
||||
result.success,
|
||||
result.message
|
||||
)
|
||||
result.message.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(result
|
||||
|
@ -118,16 +118,15 @@ impl Context {
|
|||
fn handle_receive(
|
||||
self: Pin<&mut Self>,
|
||||
message: tungstenite::Result<tungstenite::Message>,
|
||||
) -> eyre::Result<()> {
|
||||
) -> Result<(), ClientError> {
|
||||
// Technically everything should be Binary but I think I saw some Text before
|
||||
let message_data = match message? {
|
||||
tungstenite::Message::Text(t) => t.into_bytes(),
|
||||
tungstenite::Message::Binary(b) => b,
|
||||
_ => eyre::bail!("Unexpected WebSocket frame type"),
|
||||
_ => return Err(ClientError::BadWSMessageType),
|
||||
};
|
||||
|
||||
let raw_messages = CMRawProtoBufMessage::try_parse_multi(&message_data)
|
||||
.wrap_err("Parsing raw messages")?;
|
||||
let raw_messages = CMRawProtoBufMessage::try_parse_multi(&message_data)?;
|
||||
|
||||
let mut session = self.session.lock().expect("Lock was poisoned");
|
||||
|
||||
|
@ -166,10 +165,13 @@ impl Context {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_send(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> eyre::Result<()> {
|
||||
fn handle_send(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context,
|
||||
) -> Result<(), ClientError> {
|
||||
// TODO: figure out how to not cloe the Arc
|
||||
let session_arc = self.session.clone();
|
||||
let mut session = session_arc.lock().expect("Lock was poisoned");
|
||||
let mut session = session_arc.lock()?;
|
||||
|
||||
while !session.send_queue.is_empty() {
|
||||
match self.socket.poll_ready_unpin(cx) {
|
||||
|
@ -188,14 +190,17 @@ impl Context {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_inner(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> eyre::Result<()> {
|
||||
fn poll_inner(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Result<(), ClientError> {
|
||||
{
|
||||
let mut session = self.session.lock().expect("Lock was poisoned");
|
||||
session.send_waker = Some(cx.waker().clone());
|
||||
}
|
||||
|
||||
if let Poll::Ready(maybe_message) = self.as_mut().socket.poll_next_unpin(cx) {
|
||||
let message = maybe_message.ok_or_eyre("Socket was closed while trying to recieve")?;
|
||||
let message = maybe_message.ok_or(ClientError::ClosedSocket)?;
|
||||
if let Err(err) = self.as_mut().handle_receive(message) {
|
||||
log::warn!("Got error while processing message: {:?}", err);
|
||||
}
|
||||
|
@ -209,7 +214,7 @@ impl Context {
|
|||
}
|
||||
|
||||
impl Future for Context {
|
||||
type Output = eyre::Result<()>;
|
||||
type Output = Result<(), ClientError>;
|
||||
|
||||
fn poll(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
|
@ -231,10 +236,8 @@ pub struct CMSession {
|
|||
}
|
||||
|
||||
impl CMSession {
|
||||
pub async fn connect(server: &str) -> eyre::Result<Self> {
|
||||
let (socket, _) = connect_async(server)
|
||||
.await
|
||||
.wrap_err("Connecting to Steam server")?;
|
||||
pub async fn connect(server: &str) -> Result<Self, ClientError> {
|
||||
let (socket, _) = connect_async(server).await?;
|
||||
|
||||
let inner = SessionInner {
|
||||
steam_id: None,
|
||||
|
@ -269,7 +272,7 @@ impl CMSession {
|
|||
tokio::spawn(async move { cloned.send_heartbeat_task(interval).await });
|
||||
}
|
||||
|
||||
async fn send_heartbeat_task(self, interval_secs: u32) -> eyre::Result<()> {
|
||||
async fn send_heartbeat_task(self, interval_secs: u32) -> Result<(), ClientError> {
|
||||
let mut interval = tokio::time::interval(time::Duration::from_secs(interval_secs as u64));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
@ -309,7 +312,7 @@ impl CMSession {
|
|||
&self,
|
||||
action: EMsg,
|
||||
body: T,
|
||||
) -> eyre::Result<()> {
|
||||
) -> Result<(), ClientError> {
|
||||
let mut inner = self.inner.lock().expect("Lock was poisoned");
|
||||
|
||||
log::trace!("Sending notification of type {:?}", action);
|
||||
|
@ -368,13 +371,9 @@ impl<'a, T: protobuf::Message, U: protobuf::Message> CallServiceMethod<'a, T, U>
|
|||
fn finalize_response(
|
||||
&self,
|
||||
response: CMRawProtoBufMessage,
|
||||
) -> eyre::Result<CMProtoBufMessage<U>> {
|
||||
) -> Result<CMProtoBufMessage<U>, ClientError> {
|
||||
if response.action != EMsg::k_EMsgServiceMethodResponse {
|
||||
bail!("Wanted ServiceMethodResponse, got {:?}", response.action);
|
||||
}
|
||||
|
||||
if response.header.jobid_target() != self.jobid.unwrap() {
|
||||
bail!("Got wrong jobid")
|
||||
return Err(ClientError::BadResponseAction(response.action));
|
||||
}
|
||||
|
||||
CMProtoBufMessage::<U>::deserialize(response)
|
||||
|
@ -382,7 +381,7 @@ impl<'a, T: protobuf::Message, U: protobuf::Message> CallServiceMethod<'a, T, U>
|
|||
}
|
||||
|
||||
impl<T: protobuf::Message, U: protobuf::Message> Future for CallServiceMethod<'_, T, U> {
|
||||
type Output = eyre::Result<CMProtoBufMessage<U>>;
|
||||
type Output = Result<CMProtoBufMessage<U>, ClientError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
|
||||
let session_arc = self.session.inner.clone();
|
||||
|
|
52
lib/src/error.rs
Normal file
52
lib/src/error.rs
Normal file
|
@ -0,0 +1,52 @@
|
|||
#[non_exhaustive]
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ClientError {
|
||||
#[error("Valve returned bad result {0} with message `{1}`")]
|
||||
EResult(u32, String),
|
||||
|
||||
#[error("Request failure")]
|
||||
Reqwest(#[from] reqwest::Error),
|
||||
|
||||
#[error("VDF Deserialization failure")]
|
||||
Vdf(#[from] keyvalues_serde::Error),
|
||||
|
||||
#[error("WebSocket connection error")]
|
||||
WebSocket(#[from] async_tungstenite::tungstenite::Error),
|
||||
|
||||
#[error("WebSocket was closed while trying to recieve")]
|
||||
ClosedSocket,
|
||||
|
||||
#[error("ProtoBuf Deserialization error")]
|
||||
Protobuf(#[from] protobuf::Error),
|
||||
|
||||
#[error("Invalid WebSocket message type from server")]
|
||||
BadWSMessageType,
|
||||
|
||||
#[error("Decompression Error")]
|
||||
DecompressionError(#[source] std::io::Error),
|
||||
|
||||
#[error("Invalid decompressed output (expected {0} bytes, got {1})")]
|
||||
DecompressionInvalid(u32, usize),
|
||||
|
||||
#[error("Invalid message action {0}")]
|
||||
InvalidAction(u32),
|
||||
|
||||
#[error("Invalid message length")]
|
||||
InvalidMessageLength,
|
||||
|
||||
#[error("Message too short (need {0} bytes, got {1})")]
|
||||
MessageTooShort(usize, usize),
|
||||
|
||||
#[error("Lock was poisoned")]
|
||||
LockPoisoned,
|
||||
|
||||
#[error("Expected action ServiceMethodResponse, got {0:?}")]
|
||||
BadResponseAction(vapore_proto::enums_clientserver::EMsg),
|
||||
}
|
||||
|
||||
impl<T> From<std::sync::PoisonError<T>> for ClientError {
|
||||
fn from(_value: std::sync::PoisonError<T>) -> Self {
|
||||
// The guard won't be Send so we can't return it from async functions
|
||||
ClientError::LockPoisoned
|
||||
}
|
||||
}
|
|
@ -1,2 +1,5 @@
|
|||
pub mod connection;
|
||||
pub mod error;
|
||||
pub mod message;
|
||||
|
||||
pub use error::ClientError;
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use std::io::Read;
|
||||
|
||||
use color_eyre::eyre;
|
||||
use flate2::read::GzDecoder;
|
||||
use protobuf::{Enum as _, Message as _};
|
||||
use vapore_proto::{
|
||||
|
@ -8,6 +7,8 @@ use vapore_proto::{
|
|||
steammessages_base::{CMsgMulti, CMsgProtoBufHeader},
|
||||
};
|
||||
|
||||
use crate::ClientError;
|
||||
|
||||
/// A message sent over the socket. Can be either sent or recieved
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CMProtoBufMessage<T: protobuf::Message> {
|
||||
|
@ -17,11 +18,15 @@ pub struct CMProtoBufMessage<T: protobuf::Message> {
|
|||
}
|
||||
|
||||
impl<T: protobuf::Message> CMProtoBufMessage<T> {
|
||||
pub fn serialize(&self) -> eyre::Result<Vec<u8>> {
|
||||
pub fn serialize(&self) -> Result<Vec<u8>, ClientError> {
|
||||
// 4 bytes for type, 4 bytes for header length, then header and body
|
||||
// No alignment requirements
|
||||
let length = 4 + 4 + self.header.compute_size() + self.body.compute_size();
|
||||
let mut out = Vec::with_capacity(length.try_into()?);
|
||||
let mut out = Vec::with_capacity(
|
||||
length
|
||||
.try_into()
|
||||
.map_err(|_| ClientError::InvalidMessageLength)?,
|
||||
);
|
||||
|
||||
out.extend_from_slice(&(self.action.value() as u32 | 0x80000000).to_le_bytes());
|
||||
out.extend_from_slice(&self.header.cached_size().to_le_bytes());
|
||||
|
@ -31,7 +36,7 @@ impl<T: protobuf::Message> CMProtoBufMessage<T> {
|
|||
Ok(out)
|
||||
}
|
||||
|
||||
pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result<Self> {
|
||||
pub fn deserialize(raw: CMRawProtoBufMessage) -> Result<Self, ClientError> {
|
||||
let body = T::parse_from_bytes(&raw.body)?;
|
||||
|
||||
Ok(Self {
|
||||
|
@ -51,25 +56,18 @@ pub struct CMRawProtoBufMessage {
|
|||
}
|
||||
|
||||
impl CMRawProtoBufMessage {
|
||||
pub fn try_parse(binary: &[u8]) -> eyre::Result<Self> {
|
||||
pub fn try_parse(binary: &[u8]) -> Result<Self, ClientError> {
|
||||
if binary.len() < 8 {
|
||||
eyre::bail!(
|
||||
"Message too short for type (need 8 bytes, was {} bytes)",
|
||||
binary.len()
|
||||
);
|
||||
return Err(ClientError::MessageTooShort(8, binary.len()));
|
||||
}
|
||||
let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000;
|
||||
let action = EMsg::from_i32(raw_action as i32)
|
||||
.ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?;
|
||||
.ok_or_else(|| ClientError::InvalidAction(raw_action))?;
|
||||
|
||||
let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap());
|
||||
let header_end = 8 + header_length as usize;
|
||||
if binary.len() < header_end {
|
||||
eyre::bail!(
|
||||
"Message too short for header (need {}, was {})",
|
||||
header_end,
|
||||
binary.len()
|
||||
)
|
||||
return Err(ClientError::MessageTooShort(header_end, binary.len()));
|
||||
}
|
||||
|
||||
let header = CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?;
|
||||
|
@ -82,7 +80,7 @@ impl CMRawProtoBufMessage {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn try_parse_multi(binary: &[u8]) -> eyre::Result<Vec<Self>> {
|
||||
pub fn try_parse_multi(binary: &[u8]) -> Result<Vec<Self>, ClientError> {
|
||||
let root_raw = Self::try_parse(binary)?;
|
||||
if root_raw.action != EMsg::k_EMsgMulti {
|
||||
return Ok(vec![root_raw]);
|
||||
|
@ -98,14 +96,14 @@ impl CMRawProtoBufMessage {
|
|||
gzip_decompressed.reserve(size_unzipped as usize);
|
||||
|
||||
let mut gz = GzDecoder::new(root.body.message_body());
|
||||
gz.read_to_end(&mut gzip_decompressed)?;
|
||||
gz.read_to_end(&mut gzip_decompressed)
|
||||
.map_err(ClientError::DecompressionError)?;
|
||||
|
||||
if gzip_decompressed.len() != size_unzipped as usize {
|
||||
eyre::bail!(
|
||||
"Expected decompressed len {}, got {}",
|
||||
return Err(ClientError::DecompressionInvalid(
|
||||
size_unzipped,
|
||||
gzip_decompressed.len()
|
||||
);
|
||||
gzip_decompressed.len(),
|
||||
));
|
||||
}
|
||||
|
||||
&gzip_decompressed
|
||||
|
@ -118,11 +116,7 @@ impl CMRawProtoBufMessage {
|
|||
let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap());
|
||||
let message_end = 4 + full_length as usize;
|
||||
if body.len() < message_end {
|
||||
eyre::bail!(
|
||||
"sub-message too short (need {}, got {})",
|
||||
message_end,
|
||||
body.len()
|
||||
)
|
||||
return Err(ClientError::MessageTooShort(message_end, body.len()));
|
||||
}
|
||||
|
||||
match Self::try_parse(&body[4..message_end]) {
|
||||
|
|
Loading…
Reference in a new issue