diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 792b8bb..808c91a 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -7,11 +7,11 @@ use async_tungstenite::{ use color_eyre::eyre::WrapErr; use color_eyre::eyre::{self, bail, OptionExt}; use futures::{SinkExt as _, TryStreamExt}; -use protobuf::Enum as _; -use protobuf::Message as _; use serde::Deserialize; use vapore_proto::{enums_clientserver, steammessages_base}; +use crate::message::{CMProtoBufMessage, CMRawProtoBufMessage}; + #[derive(Debug, Deserialize)] struct GetCMListResult<'a> { // No need to check servers, we're only implementing websockets @@ -40,106 +40,6 @@ pub async fn bootstrap_find_servers() -> eyre::Result> { .collect()) } -/// A message sent over the socket. Can be either sent or recieved -#[derive(Debug, Clone)] -pub struct CMProtoBufMessage { - pub action: enums_clientserver::EMsg, - pub header: steammessages_base::CMsgProtoBufHeader, - pub body: T, -} - -impl CMProtoBufMessage { - pub fn serialize(&self) -> eyre::Result> { - // 4 bytes for type, 4 bytes for header length, then header and body - // No alignment requirements - let length = 4 + 4 + self.header.compute_size() + self.body.compute_size(); - let mut out = Vec::with_capacity(length.try_into()?); - - out.extend_from_slice(&(self.action.value() as u32 | 0x80000000).to_le_bytes()); - out.extend_from_slice(&self.header.cached_size().to_le_bytes()); - self.header.write_to_vec(&mut out)?; - self.body.write_to_vec(&mut out)?; - - Ok(out) - } - - pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result { - let body = T::parse_from_bytes(&raw.body)?; - - Ok(Self { - action: raw.action, - header: raw.header, - body, - }) - } -} - -/// A message sent over the socket, but the body is still serialized -#[derive(Debug, Clone)] -pub struct CMRawProtoBufMessage { - pub action: enums_clientserver::EMsg, - pub header: steammessages_base::CMsgProtoBufHeader, - pub body: Vec, -} - -impl CMRawProtoBufMessage { - pub fn try_parse(binary: &[u8]) -> eyre::Result { - if binary.len() < 8 { - bail!("Message too short for type"); - } - let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000; - let action = enums_clientserver::EMsg::from_i32(raw_action as i32) - .ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?; - - let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap()); - let header_end = 8 + header_length as usize; - if binary.len() < header_end { - bail!("Message too short for header") - } - - let header = - steammessages_base::CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?; - let body = binary[header_end..].to_vec(); - - Ok(Self { - action, - header, - body, - }) - } - - pub fn try_parse_multi(binary: &[u8]) -> eyre::Result> { - let root_raw = Self::try_parse(binary)?; - if root_raw.action != enums_clientserver::EMsg::k_EMsgMulti { - return Ok(vec![root_raw]); - } - - // Multi messages are a bunch of [full_length][action][header_length][header][body] inside - // possibly gzipped bytes inside a protobuf. - // why, valve - let root = CMProtoBufMessage::::deserialize(root_raw)?; - if root.body.size_unzipped.is_some() { - todo!("gzip support in CMsgMulti") - } - - let mut items = Vec::new(); - let mut body = root.body.message_body(); - while body.len() >= 4 { - let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap()); - let message_end = 4 + full_length as usize; - if body.len() < message_end { - eyre::bail!("sub-message too short") - } - - items.push(Self::try_parse(&body[4..message_end])?); - - body = &body[message_end..]; - } - - Ok(items) - } -} - pub struct CMSession { socket: WebSocketStream, /// Steam ID of current user. When set to None we are not logged in diff --git a/daemon/src/main.rs b/daemon/src/main.rs index 34a5100..9b4e14f 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -11,6 +11,7 @@ use vapore_proto::{ }; mod connection; +mod message; #[tokio::main] pub async fn main() -> eyre::Result<()> { diff --git a/daemon/src/message.rs b/daemon/src/message.rs new file mode 100644 index 0000000..ddc37eb --- /dev/null +++ b/daemon/src/message.rs @@ -0,0 +1,105 @@ +use color_eyre::eyre; +use protobuf::{Enum as _, Message as _}; +use vapore_proto::{ + enums_clientserver::EMsg, + steammessages_base::{CMsgMulti, CMsgProtoBufHeader}, +}; + +/// A message sent over the socket. Can be either sent or recieved +#[derive(Debug, Clone)] +pub struct CMProtoBufMessage { + pub action: EMsg, + pub header: CMsgProtoBufHeader, + pub body: T, +} + +impl CMProtoBufMessage { + pub fn serialize(&self) -> eyre::Result> { + // 4 bytes for type, 4 bytes for header length, then header and body + // No alignment requirements + let length = 4 + 4 + self.header.compute_size() + self.body.compute_size(); + let mut out = Vec::with_capacity(length.try_into()?); + + out.extend_from_slice(&(self.action.value() as u32 | 0x80000000).to_le_bytes()); + out.extend_from_slice(&self.header.cached_size().to_le_bytes()); + self.header.write_to_vec(&mut out)?; + self.body.write_to_vec(&mut out)?; + + Ok(out) + } + + pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result { + let body = T::parse_from_bytes(&raw.body)?; + + Ok(Self { + action: raw.action, + header: raw.header, + body, + }) + } +} + +/// A message sent over the socket, but the body is still serialized +#[derive(Debug, Clone)] +pub struct CMRawProtoBufMessage { + pub action: EMsg, + pub header: CMsgProtoBufHeader, + pub body: Vec, +} + +impl CMRawProtoBufMessage { + pub fn try_parse(binary: &[u8]) -> eyre::Result { + if binary.len() < 8 { + eyre::bail!("Message too short for type"); + } + let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000; + let action = EMsg::from_i32(raw_action as i32) + .ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?; + + let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap()); + let header_end = 8 + header_length as usize; + if binary.len() < header_end { + eyre::bail!("Message too short for header") + } + + let header = CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?; + let body = binary[header_end..].to_vec(); + + Ok(Self { + action, + header, + body, + }) + } + + pub fn try_parse_multi(binary: &[u8]) -> eyre::Result> { + let root_raw = Self::try_parse(binary)?; + if root_raw.action != EMsg::k_EMsgMulti { + return Ok(vec![root_raw]); + } + + // Multi messages are a bunch of [full_length][action][header_length][header][body] inside + // possibly gzipped bytes inside a protobuf. + // why, valve + let root = CMProtoBufMessage::::deserialize(root_raw)?; + if root.body.size_unzipped.is_some() { + todo!("gzip support in CMsgMulti") + } + + let mut items = Vec::new(); + let mut body = root.body.message_body(); + while body.len() >= 4 { + let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap()); + let message_end = 4 + full_length as usize; + if body.len() < message_end { + eyre::bail!("sub-message too short") + } + + items.push(Self::try_parse(&body[4..message_end])?); + + body = &body[message_end..]; + } + + Ok(items) + } +}