From ce0ceba2f2a77c01facd57c8d78c64d3c22f7ec9 Mon Sep 17 00:00:00 2001 From: Skye Jensen Date: Fri, 7 Aug 2020 15:00:03 -0400 Subject: [PATCH] Propagate tile stream errors further before unwrapping --- src/config.rs | 9 +++------ src/main.rs | 12 ++++++------ src/output.rs | 10 +++++++--- src/tiles/hostname.rs | 3 +-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/config.rs b/src/config.rs index aef14fa..86cb01e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,5 @@ use crate::tiles; +use crate::tiles::TileResult; use dbus::nonblock::SyncConnection; use futures::{stream::BoxStream, Stream}; use serde::{Deserialize, Deserializer}; @@ -11,7 +12,6 @@ use structopt::StructOpt; use tokio::fs::File; use tokio::prelude::*; use tokio::time::{self, Duration}; -use crate::tiles::TileResult; #[derive(Deserialize, Clone, Debug, Default)] #[serde(default)] @@ -119,17 +119,14 @@ pub fn process_tile( let five_secs = Duration::from_secs(5); match &tile.config_type { TileConfigType::Battery => wrap(tiles::battery_stream(), tile.update.or(Some(five_secs))), - TileConfigType::Hostname => wrap(tiles::hostname_stream(connection.clone()), tile.update), + TileConfigType::Hostname => wrap(tiles::hostname_stream(connection.as_ref()), tile.update), TileConfigType::Load => wrap(tiles::load_stream(), tile.update.or(Some(five_secs))), TileConfigType::Memory => wrap(tiles::memory_stream(), tile.update.or(Some(five_secs))), TileConfigType::Time(c) => wrap(tiles::time_stream(c.clone()), tile.update), } } -fn wrap<'a, S>( - stream: S, - duration: Option, -) -> BoxStream<'a, TileResult> +fn wrap<'a, S>(stream: S, duration: Option) -> BoxStream<'a, TileResult> where S: Stream + Send + 'a, { diff --git a/src/main.rs b/src/main.rs index 32d9b22..9981a80 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,20 +45,20 @@ async fn main() -> Result<(), Box> { fn spawn_stream( index: usize, stream: BoxStream<'static, Result>, - sender: Sender, + sender: Sender>, ) where - E: Debug, + E: Debug + Send, { tokio::spawn(async move { let instance: Arc = Uuid::new_v4().to_string().into(); - let stream = stream.map(|block| { - Ok(tile::TileData { + let stream = stream.map(|block: Result<_, _>| { + Ok(block.map(|block| tile::TileData { block: tile::Block { instance: instance.clone(), - ..block.unwrap() + ..block }, sender_id: index, - }) + })) }); let future = stream.forward(sender); future.await diff --git a/src/output.rs b/src/output.rs index cd29b0b..d005617 100644 --- a/src/output.rs +++ b/src/output.rs @@ -5,11 +5,14 @@ use futures::StreamExt; use std::convert::Infallible; use tokio::io::{self, AsyncWriteExt}; -pub async fn launch( +pub async fn launch( num_tiles: usize, - mut receiver: Receiver, + mut receiver: Receiver>, _default: DefaultSection, -) -> io::Result { +) -> io::Result +where + E: Send + std::fmt::Debug, +{ let mut stdout = io::stdout(); stdout.write_all(b"{ \"version\": 1 }\n[").await?; @@ -17,6 +20,7 @@ pub async fn launch( blocks.resize_with(num_tiles, Default::default); loop { let message = receiver.next().await.unwrap(); + let message = message.unwrap(); if message.sender_id < num_tiles { blocks[message.sender_id] = Some(message.block); } else { diff --git a/src/tiles/hostname.rs b/src/tiles/hostname.rs index 0f3dcb8..4223181 100644 --- a/src/tiles/hostname.rs +++ b/src/tiles/hostname.rs @@ -3,10 +3,9 @@ use crate::tiles::TileResult; use dbus::nonblock::stdintf::org_freedesktop_dbus::Properties; use dbus::nonblock::{Proxy, SyncConnection}; use futures::{FutureExt, Stream}; -use std::sync::Arc; use std::time::Duration; -pub fn hostname_stream(connection: Arc) -> impl Stream { +pub fn hostname_stream(connection: &SyncConnection) -> impl Stream { let proxy = Proxy::new( "org.freedesktop.hostname1", "/org/freedesktop/hostname1",