From d539731705ee04fc716487e4efb872b1d60cdbf6 Mon Sep 17 00:00:00 2001 From: Artemis Tosini Date: Thu, 5 Sep 2024 01:34:05 +0000 Subject: [PATCH] lib: Replace eyre with thiserror --- Cargo.lock | 1 + lib/Cargo.toml | 1 + lib/src/connection.rs | 59 +++++++++++++++++++++---------------------- lib/src/error.rs | 52 ++++++++++++++++++++++++++++++++++++++ lib/src/lib.rs | 3 +++ lib/src/message.rs | 46 +++++++++++++++------------------ 6 files changed, 106 insertions(+), 56 deletions(-) create mode 100644 lib/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 7949ff0..9a27e6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1873,6 +1873,7 @@ dependencies = [ "reqwest", "rsa", "serde", + "thiserror", "tokio", "vapore-proto", ] diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 1279531..0f00c44 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -15,6 +15,7 @@ rand = "0.8.5" reqwest = { version = "0.12", features = ["rustls-tls-native-roots"], default-features = false} rsa = "0.9.6" serde = { version = "1.0.209", features = ["derive"] } +thiserror = "1.0.63" tokio = { version = "1.39", features = ["rt", "rt-multi-thread", "macros", "time"]} vapore-proto.path = "../proto" diff --git a/lib/src/connection.rs b/lib/src/connection.rs index 652b05f..88221ce 100644 --- a/lib/src/connection.rs +++ b/lib/src/connection.rs @@ -12,8 +12,6 @@ use async_tungstenite::{ tokio::{connect_async, ConnectStream}, tungstenite, WebSocketStream, }; -use color_eyre::eyre::WrapErr; -use color_eyre::eyre::{self, bail, OptionExt}; use futures::{SinkExt as _, StreamExt}; use serde::Deserialize; use tokio::sync::broadcast; @@ -22,7 +20,10 @@ use vapore_proto::{ steammessages_clientserver_login::CMsgClientHeartBeat, }; -use crate::message::{CMProtoBufMessage, CMRawProtoBufMessage}; +use crate::{ + message::{CMProtoBufMessage, CMRawProtoBufMessage}, + ClientError, +}; /// Maximum number of messages in the buffer for by-message-type subscriptions const CHANNEL_CAPACITY: usize = 16; @@ -46,7 +47,7 @@ pub struct GetCMListForConnectResponse<'a> { message: &'a str, } -pub async fn bootstrap_find_servers() -> eyre::Result> { +pub async fn bootstrap_find_servers() -> Result, ClientError> { let response = reqwest::get( "https://api.steampowered.com/ISteamDirectory/GetCMListForConnect/v1/?cellid=0&format=vdf", ) @@ -56,11 +57,10 @@ pub async fn bootstrap_find_servers() -> eyre::Result> { let result: GetCMListForConnectResponse = keyvalues_serde::from_str(&response)?; if result.success != 1 { - eyre::bail!( - "GetCMList returned bad result {} wtih message {}", + return Err(ClientError::EResult( result.success, - result.message - ) + result.message.to_string(), + )); } Ok(result @@ -118,16 +118,15 @@ impl Context { fn handle_receive( self: Pin<&mut Self>, message: tungstenite::Result, - ) -> eyre::Result<()> { + ) -> Result<(), ClientError> { // Technically everything should be Binary but I think I saw some Text before let message_data = match message? { tungstenite::Message::Text(t) => t.into_bytes(), tungstenite::Message::Binary(b) => b, - _ => eyre::bail!("Unexpected WebSocket frame type"), + _ => return Err(ClientError::BadWSMessageType), }; - let raw_messages = CMRawProtoBufMessage::try_parse_multi(&message_data) - .wrap_err("Parsing raw messages")?; + let raw_messages = CMRawProtoBufMessage::try_parse_multi(&message_data)?; let mut session = self.session.lock().expect("Lock was poisoned"); @@ -166,10 +165,13 @@ impl Context { Ok(()) } - fn handle_send(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> eyre::Result<()> { + fn handle_send( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context, + ) -> Result<(), ClientError> { // TODO: figure out how to not cloe the Arc let session_arc = self.session.clone(); - let mut session = session_arc.lock().expect("Lock was poisoned"); + let mut session = session_arc.lock()?; while !session.send_queue.is_empty() { match self.socket.poll_ready_unpin(cx) { @@ -188,14 +190,17 @@ impl Context { Ok(()) } - fn poll_inner(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> eyre::Result<()> { + fn poll_inner( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Result<(), ClientError> { { let mut session = self.session.lock().expect("Lock was poisoned"); session.send_waker = Some(cx.waker().clone()); } if let Poll::Ready(maybe_message) = self.as_mut().socket.poll_next_unpin(cx) { - let message = maybe_message.ok_or_eyre("Socket was closed while trying to recieve")?; + let message = maybe_message.ok_or(ClientError::ClosedSocket)?; if let Err(err) = self.as_mut().handle_receive(message) { log::warn!("Got error while processing message: {:?}", err); } @@ -209,7 +214,7 @@ impl Context { } impl Future for Context { - type Output = eyre::Result<()>; + type Output = Result<(), ClientError>; fn poll( self: std::pin::Pin<&mut Self>, @@ -231,10 +236,8 @@ pub struct CMSession { } impl CMSession { - pub async fn connect(server: &str) -> eyre::Result { - let (socket, _) = connect_async(server) - .await - .wrap_err("Connecting to Steam server")?; + pub async fn connect(server: &str) -> Result { + let (socket, _) = connect_async(server).await?; let inner = SessionInner { steam_id: None, @@ -269,7 +272,7 @@ impl CMSession { tokio::spawn(async move { cloned.send_heartbeat_task(interval).await }); } - async fn send_heartbeat_task(self, interval_secs: u32) -> eyre::Result<()> { + async fn send_heartbeat_task(self, interval_secs: u32) -> Result<(), ClientError> { let mut interval = tokio::time::interval(time::Duration::from_secs(interval_secs as u64)); loop { interval.tick().await; @@ -309,7 +312,7 @@ impl CMSession { &self, action: EMsg, body: T, - ) -> eyre::Result<()> { + ) -> Result<(), ClientError> { let mut inner = self.inner.lock().expect("Lock was poisoned"); log::trace!("Sending notification of type {:?}", action); @@ -368,13 +371,9 @@ impl<'a, T: protobuf::Message, U: protobuf::Message> CallServiceMethod<'a, T, U> fn finalize_response( &self, response: CMRawProtoBufMessage, - ) -> eyre::Result> { + ) -> Result, ClientError> { if response.action != EMsg::k_EMsgServiceMethodResponse { - bail!("Wanted ServiceMethodResponse, got {:?}", response.action); - } - - if response.header.jobid_target() != self.jobid.unwrap() { - bail!("Got wrong jobid") + return Err(ClientError::BadResponseAction(response.action)); } CMProtoBufMessage::::deserialize(response) @@ -382,7 +381,7 @@ impl<'a, T: protobuf::Message, U: protobuf::Message> CallServiceMethod<'a, T, U> } impl Future for CallServiceMethod<'_, T, U> { - type Output = eyre::Result>; + type Output = Result, ClientError>; fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let session_arc = self.session.inner.clone(); diff --git a/lib/src/error.rs b/lib/src/error.rs new file mode 100644 index 0000000..52be2e6 --- /dev/null +++ b/lib/src/error.rs @@ -0,0 +1,52 @@ +#[non_exhaustive] +#[derive(thiserror::Error, Debug)] +pub enum ClientError { + #[error("Valve returned bad result {0} with message `{1}`")] + EResult(u32, String), + + #[error("Request failure")] + Reqwest(#[from] reqwest::Error), + + #[error("VDF Deserialization failure")] + Vdf(#[from] keyvalues_serde::Error), + + #[error("WebSocket connection error")] + WebSocket(#[from] async_tungstenite::tungstenite::Error), + + #[error("WebSocket was closed while trying to recieve")] + ClosedSocket, + + #[error("ProtoBuf Deserialization error")] + Protobuf(#[from] protobuf::Error), + + #[error("Invalid WebSocket message type from server")] + BadWSMessageType, + + #[error("Decompression Error")] + DecompressionError(#[source] std::io::Error), + + #[error("Invalid decompressed output (expected {0} bytes, got {1})")] + DecompressionInvalid(u32, usize), + + #[error("Invalid message action {0}")] + InvalidAction(u32), + + #[error("Invalid message length")] + InvalidMessageLength, + + #[error("Message too short (need {0} bytes, got {1})")] + MessageTooShort(usize, usize), + + #[error("Lock was poisoned")] + LockPoisoned, + + #[error("Expected action ServiceMethodResponse, got {0:?}")] + BadResponseAction(vapore_proto::enums_clientserver::EMsg), +} + +impl From> for ClientError { + fn from(_value: std::sync::PoisonError) -> Self { + // The guard won't be Send so we can't return it from async functions + ClientError::LockPoisoned + } +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ea305ac..d666841 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,2 +1,5 @@ pub mod connection; +pub mod error; pub mod message; + +pub use error::ClientError; diff --git a/lib/src/message.rs b/lib/src/message.rs index 637e550..6731fde 100644 --- a/lib/src/message.rs +++ b/lib/src/message.rs @@ -1,6 +1,5 @@ use std::io::Read; -use color_eyre::eyre; use flate2::read::GzDecoder; use protobuf::{Enum as _, Message as _}; use vapore_proto::{ @@ -8,6 +7,8 @@ use vapore_proto::{ steammessages_base::{CMsgMulti, CMsgProtoBufHeader}, }; +use crate::ClientError; + /// A message sent over the socket. Can be either sent or recieved #[derive(Debug, Clone)] pub struct CMProtoBufMessage { @@ -17,11 +18,15 @@ pub struct CMProtoBufMessage { } impl CMProtoBufMessage { - pub fn serialize(&self) -> eyre::Result> { + pub fn serialize(&self) -> Result, ClientError> { // 4 bytes for type, 4 bytes for header length, then header and body // No alignment requirements let length = 4 + 4 + self.header.compute_size() + self.body.compute_size(); - let mut out = Vec::with_capacity(length.try_into()?); + let mut out = Vec::with_capacity( + length + .try_into() + .map_err(|_| ClientError::InvalidMessageLength)?, + ); out.extend_from_slice(&(self.action.value() as u32 | 0x80000000).to_le_bytes()); out.extend_from_slice(&self.header.cached_size().to_le_bytes()); @@ -31,7 +36,7 @@ impl CMProtoBufMessage { Ok(out) } - pub fn deserialize(raw: CMRawProtoBufMessage) -> eyre::Result { + pub fn deserialize(raw: CMRawProtoBufMessage) -> Result { let body = T::parse_from_bytes(&raw.body)?; Ok(Self { @@ -51,25 +56,18 @@ pub struct CMRawProtoBufMessage { } impl CMRawProtoBufMessage { - pub fn try_parse(binary: &[u8]) -> eyre::Result { + pub fn try_parse(binary: &[u8]) -> Result { if binary.len() < 8 { - eyre::bail!( - "Message too short for type (need 8 bytes, was {} bytes)", - binary.len() - ); + return Err(ClientError::MessageTooShort(8, binary.len())); } let raw_action = u32::from_le_bytes(binary[0..4].try_into().unwrap()) & !0x8000_0000; let action = EMsg::from_i32(raw_action as i32) - .ok_or_else(|| eyre::eyre!("Unknown message action {}", raw_action))?; + .ok_or_else(|| ClientError::InvalidAction(raw_action))?; let header_length = u32::from_le_bytes(binary[4..8].try_into().unwrap()); let header_end = 8 + header_length as usize; if binary.len() < header_end { - eyre::bail!( - "Message too short for header (need {}, was {})", - header_end, - binary.len() - ) + return Err(ClientError::MessageTooShort(header_end, binary.len())); } let header = CMsgProtoBufHeader::parse_from_bytes(&binary[8..header_end])?; @@ -82,7 +80,7 @@ impl CMRawProtoBufMessage { }) } - pub fn try_parse_multi(binary: &[u8]) -> eyre::Result> { + pub fn try_parse_multi(binary: &[u8]) -> Result, ClientError> { let root_raw = Self::try_parse(binary)?; if root_raw.action != EMsg::k_EMsgMulti { return Ok(vec![root_raw]); @@ -98,14 +96,14 @@ impl CMRawProtoBufMessage { gzip_decompressed.reserve(size_unzipped as usize); let mut gz = GzDecoder::new(root.body.message_body()); - gz.read_to_end(&mut gzip_decompressed)?; + gz.read_to_end(&mut gzip_decompressed) + .map_err(ClientError::DecompressionError)?; if gzip_decompressed.len() != size_unzipped as usize { - eyre::bail!( - "Expected decompressed len {}, got {}", + return Err(ClientError::DecompressionInvalid( size_unzipped, - gzip_decompressed.len() - ); + gzip_decompressed.len(), + )); } &gzip_decompressed @@ -118,11 +116,7 @@ impl CMRawProtoBufMessage { let full_length = u32::from_le_bytes(body[0..4].try_into().unwrap()); let message_end = 4 + full_length as usize; if body.len() < message_end { - eyre::bail!( - "sub-message too short (need {}, got {})", - message_end, - body.len() - ) + return Err(ClientError::MessageTooShort(message_end, body.len())); } match Self::try_parse(&body[4..message_end]) {