Start working on serialization

This commit is contained in:
Artemis Tosini 2024-08-26 01:27:22 +00:00
parent 57125d541e
commit 8b1ad9078a
Signed by: artemist
GPG key ID: EE5227935FE3FF18

View file

@ -40,13 +40,14 @@ pub async fn bootstrap_find_servers() -> eyre::Result<Vec<String>> {
.collect()) .collect())
} }
struct CMProtoBufMessage<'a, T: protobuf::Message> { /// A message sent over the socket. Can be either sent or recieved
pub struct CMProtoBufMessage<T: protobuf::Message> {
pub action: enums_clientserver::EMsg, pub action: enums_clientserver::EMsg,
pub header: &'a steammessages_base::CMsgProtoBufHeader, pub header: steammessages_base::CMsgProtoBufHeader,
pub body: &'a T, pub body: T,
} }
impl<T: protobuf::Message> CMProtoBufMessage<'_, T> { impl<T: protobuf::Message> CMProtoBufMessage<T> {
pub fn serialize(&self) -> eyre::Result<Vec<u8>> { pub fn serialize(&self) -> eyre::Result<Vec<u8>> {
// 4 bytes for type, 4 bytes for header length, then header and body // 4 bytes for type, 4 bytes for header length, then header and body
// No alignment requirements // No alignment requirements
@ -60,6 +61,81 @@ impl<T: protobuf::Message> CMProtoBufMessage<'_, T> {
Ok(out) Ok(out)
} }
pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result<Self> {
let body = T::parse_from_bytes(&raw.body)?;
Ok(Self {
action: raw.action,
header: raw.header,
body,
})
}
}
/// A message sent over the socket, but the body is still serialized
pub struct CMRawProtoBufMessage {
pub action: enums_clientserver::EMsg,
pub header: steammessages_base::CMsgProtoBufHeader,
pub body: Vec<u8>,
}
impl CMRawProtoBufMessage {
pub fn try_parse(binary: &[u8]) -> eyre::Result<Self> {
if binary.len() < 8 {
bail!("Message too short for type");
}
let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000;
let action = enums_clientserver::EMsg::from_i32(raw_action as i32)
.ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?;
let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap());
let header_end = 8 + header_length as usize;
if binary.len() < header_end {
bail!("Message too short for header")
}
let header =
steammessages_base::CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?;
let body = binary[header_end..].to_vec();
Ok(Self {
action,
header,
body,
})
}
pub fn try_parse_multi(binary: &[u8]) -> eyre::Result<Vec<Self>> {
let root_raw = Self::try_parse(binary)?;
if root_raw.action != enums_clientserver::EMsg::k_EMsgMulti {
return Ok(vec![root_raw]);
}
// Multi messages are a bunch of [full_length][action][header_length][header][body] inside
// possibly gzipped bytes inside a protobuf.
// why, valve
let root = CMProtoBufMessage::<steammessages_base::CMsgMulti>::deserialize(root_raw)?;
if root.body.size_unzipped.is_some() {
todo!("gzip support in CMsgMulti")
}
let mut items = Vec::new();
let mut body = root.body.message_body();
while body.len() >= 4 {
let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap());
let message_end = 4 + full_length as usize;
if body.len() < message_end {
eyre::bail!("sub-message too short")
}
items.push(Self::try_parse(&body[4..message_end])?);
body = &body[message_end..];
}
Ok(items)
}
} }
pub struct CMSession { pub struct CMSession {
@ -94,8 +170,8 @@ impl CMSession {
pub async fn call_service_method<Request: protobuf::Message, Response: protobuf::Message>( pub async fn call_service_method<Request: protobuf::Message, Response: protobuf::Message>(
&mut self, &mut self,
method: String, method: String,
body: &Request, body: Request,
) -> eyre::Result<Response> { ) -> eyre::Result<CMProtoBufMessage<Response>> {
log::trace!("Calling service method `{}`", method); log::trace!("Calling service method `{}`", method);
let action = if self.is_authed() { let action = if self.is_authed() {
@ -118,24 +194,42 @@ impl CMSession {
let message = CMProtoBufMessage { let message = CMProtoBufMessage {
action, action,
header: &header, header,
body, body,
}; };
let serialized = message.serialize()?; let serialized = message.serialize()?;
self.socket self.socket
.send(tungstenite::protocol::Message::Binary(serialized)) .send(tungstenite::protocol::Message::Binary(serialized))
.await?; .await?;
let response = self let response_message = self
.socket .socket
.try_next() .try_next()
.await? .await?
.ok_or_eyre("No message recieved")?; .ok_or_eyre("No message recieved")?;
let tungstenite::protocol::Message::Binary(response_binary) = response else { let tungstenite::protocol::Message::Binary(response_binary) = response_message else {
bail!("Message recieved was not binary") bail!("Message recieved was not binary")
}; };
todo!() let responses_raw = CMRawProtoBufMessage::try_parse_multi(&response_binary)?;
if responses_raw.len() != 1 {
todo!("Multiple responses")
}
let response_raw = responses_raw.into_iter().next().unwrap();
if response_raw.action != enums_clientserver::EMsg::k_EMsgServiceMethodResponse {
bail!(
"Wanted ServiceMethodResponse, got {:?}",
response_raw.action
);
}
if response_raw.header.jobid_target() != jobid {
bail!("Got wrong jobid")
}
CMProtoBufMessage::<Response>::deserialize(response_raw)
} }
/// Whether the current session is authenticated /// Whether the current session is authenticated