Move out message parsing

This commit is contained in:
Artemis Tosini 2024-08-26 04:25:29 +00:00
parent 07c6320c71
commit ded292f332
Signed by: artemist
GPG key ID: EE5227935FE3FF18
3 changed files with 108 additions and 102 deletions

View file

@ -7,11 +7,11 @@ use async_tungstenite::{
use color_eyre::eyre::WrapErr; use color_eyre::eyre::WrapErr;
use color_eyre::eyre::{self, bail, OptionExt}; use color_eyre::eyre::{self, bail, OptionExt};
use futures::{SinkExt as _, TryStreamExt}; use futures::{SinkExt as _, TryStreamExt};
use protobuf::Enum as _;
use protobuf::Message as _;
use serde::Deserialize; use serde::Deserialize;
use vapore_proto::{enums_clientserver, steammessages_base}; use vapore_proto::{enums_clientserver, steammessages_base};
use crate::message::{CMProtoBufMessage, CMRawProtoBufMessage};
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct GetCMListResult<'a> { struct GetCMListResult<'a> {
// No need to check servers, we're only implementing websockets // No need to check servers, we're only implementing websockets
@ -40,106 +40,6 @@ pub async fn bootstrap_find_servers() -> eyre::Result<Vec<String>> {
.collect()) .collect())
} }
/// A message sent over the socket. Can be either sent or recieved
#[derive(Debug, Clone)]
pub struct CMProtoBufMessage<T: protobuf::Message> {
pub action: enums_clientserver::EMsg,
pub header: steammessages_base::CMsgProtoBufHeader,
pub body: T,
}
impl<T: protobuf::Message> CMProtoBufMessage<T> {
pub fn serialize(&self) -> eyre::Result<Vec<u8>> {
// 4 bytes for type, 4 bytes for header length, then header and body
// No alignment requirements
let length = 4 + 4 + self.header.compute_size() + self.body.compute_size();
let mut out = Vec::with_capacity(length.try_into()?);
out.extend_from_slice(&(self.action.value() as u32 | 0x80000000).to_le_bytes());
out.extend_from_slice(&self.header.cached_size().to_le_bytes());
self.header.write_to_vec(&mut out)?;
self.body.write_to_vec(&mut out)?;
Ok(out)
}
pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result<Self> {
let body = T::parse_from_bytes(&raw.body)?;
Ok(Self {
action: raw.action,
header: raw.header,
body,
})
}
}
/// A message sent over the socket, but the body is still serialized
#[derive(Debug, Clone)]
pub struct CMRawProtoBufMessage {
pub action: enums_clientserver::EMsg,
pub header: steammessages_base::CMsgProtoBufHeader,
pub body: Vec<u8>,
}
impl CMRawProtoBufMessage {
pub fn try_parse(binary: &[u8]) -> eyre::Result<Self> {
if binary.len() < 8 {
bail!("Message too short for type");
}
let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000;
let action = enums_clientserver::EMsg::from_i32(raw_action as i32)
.ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?;
let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap());
let header_end = 8 + header_length as usize;
if binary.len() < header_end {
bail!("Message too short for header")
}
let header =
steammessages_base::CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?;
let body = binary[header_end..].to_vec();
Ok(Self {
action,
header,
body,
})
}
pub fn try_parse_multi(binary: &[u8]) -> eyre::Result<Vec<Self>> {
let root_raw = Self::try_parse(binary)?;
if root_raw.action != enums_clientserver::EMsg::k_EMsgMulti {
return Ok(vec![root_raw]);
}
// Multi messages are a bunch of [full_length][action][header_length][header][body] inside
// possibly gzipped bytes inside a protobuf.
// why, valve
let root = CMProtoBufMessage::<steammessages_base::CMsgMulti>::deserialize(root_raw)?;
if root.body.size_unzipped.is_some() {
todo!("gzip support in CMsgMulti")
}
let mut items = Vec::new();
let mut body = root.body.message_body();
while body.len() >= 4 {
let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap());
let message_end = 4 + full_length as usize;
if body.len() < message_end {
eyre::bail!("sub-message too short")
}
items.push(Self::try_parse(&body[4..message_end])?);
body = &body[message_end..];
}
Ok(items)
}
}
pub struct CMSession { pub struct CMSession {
socket: WebSocketStream<ConnectStream>, socket: WebSocketStream<ConnectStream>,
/// Steam ID of current user. When set to None we are not logged in /// Steam ID of current user. When set to None we are not logged in

View file

@ -11,6 +11,7 @@ use vapore_proto::{
}; };
mod connection; mod connection;
mod message;
#[tokio::main] #[tokio::main]
pub async fn main() -> eyre::Result<()> { pub async fn main() -> eyre::Result<()> {

105
daemon/src/message.rs Normal file
View file

@ -0,0 +1,105 @@
use color_eyre::eyre;
use protobuf::{Enum as _, Message as _};
use vapore_proto::{
enums_clientserver::EMsg,
steammessages_base::{CMsgMulti, CMsgProtoBufHeader},
};
/// A message sent over the socket. Can be either sent or recieved
#[derive(Debug, Clone)]
pub struct CMProtoBufMessage<T: protobuf::Message> {
pub action: EMsg,
pub header: CMsgProtoBufHeader,
pub body: T,
}
impl<T: protobuf::Message> CMProtoBufMessage<T> {
pub fn serialize(&self) -> eyre::Result<Vec<u8>> {
// 4 bytes for type, 4 bytes for header length, then header and body
// No alignment requirements
let length = 4 + 4 + self.header.compute_size() + self.body.compute_size();
let mut out = Vec::with_capacity(length.try_into()?);
out.extend_from_slice(&(self.action.value() as u32 | 0x80000000).to_le_bytes());
out.extend_from_slice(&self.header.cached_size().to_le_bytes());
self.header.write_to_vec(&mut out)?;
self.body.write_to_vec(&mut out)?;
Ok(out)
}
pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result<Self> {
let body = T::parse_from_bytes(&raw.body)?;
Ok(Self {
action: raw.action,
header: raw.header,
body,
})
}
}
/// A message sent over the socket, but the body is still serialized
#[derive(Debug, Clone)]
pub struct CMRawProtoBufMessage {
pub action: EMsg,
pub header: CMsgProtoBufHeader,
pub body: Vec<u8>,
}
impl CMRawProtoBufMessage {
pub fn try_parse(binary: &[u8]) -> eyre::Result<Self> {
if binary.len() < 8 {
eyre::bail!("Message too short for type");
}
let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000;
let action = EMsg::from_i32(raw_action as i32)
.ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?;
let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap());
let header_end = 8 + header_length as usize;
if binary.len() < header_end {
eyre::bail!("Message too short for header")
}
let header = CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?;
let body = binary[header_end..].to_vec();
Ok(Self {
action,
header,
body,
})
}
pub fn try_parse_multi(binary: &[u8]) -> eyre::Result<Vec<Self>> {
let root_raw = Self::try_parse(binary)?;
if root_raw.action != EMsg::k_EMsgMulti {
return Ok(vec![root_raw]);
}
// Multi messages are a bunch of [full_length][action][header_length][header][body] inside
// possibly gzipped bytes inside a protobuf.
// why, valve
let root = CMProtoBufMessage::<CMsgMulti>::deserialize(root_raw)?;
if root.body.size_unzipped.is_some() {
todo!("gzip support in CMsgMulti")
}
let mut items = Vec::new();
let mut body = root.body.message_body();
while body.len() >= 4 {
let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap());
let message_end = 4 + full_length as usize;
if body.len() < message_end {
eyre::bail!("sub-message too short")
}
items.push(Self::try_parse(&body[4..message_end])?);
body = &body[message_end..];
}
Ok(items)
}
}