Propagate tile stream errors further before unwrapping

This commit is contained in:
Skye Jensen 2020-08-07 15:00:03 -04:00
parent 63d119a29f
commit ce0ceba2f2
4 changed files with 17 additions and 17 deletions

View file

@ -1,4 +1,5 @@
use crate::tiles; use crate::tiles;
use crate::tiles::TileResult;
use dbus::nonblock::SyncConnection; use dbus::nonblock::SyncConnection;
use futures::{stream::BoxStream, Stream}; use futures::{stream::BoxStream, Stream};
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
@ -11,7 +12,6 @@ use structopt::StructOpt;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::time::{self, Duration}; use tokio::time::{self, Duration};
use crate::tiles::TileResult;
#[derive(Deserialize, Clone, Debug, Default)] #[derive(Deserialize, Clone, Debug, Default)]
#[serde(default)] #[serde(default)]
@ -119,17 +119,14 @@ pub fn process_tile(
let five_secs = Duration::from_secs(5); let five_secs = Duration::from_secs(5);
match &tile.config_type { match &tile.config_type {
TileConfigType::Battery => wrap(tiles::battery_stream(), tile.update.or(Some(five_secs))), TileConfigType::Battery => wrap(tiles::battery_stream(), tile.update.or(Some(five_secs))),
TileConfigType::Hostname => wrap(tiles::hostname_stream(connection.clone()), tile.update), TileConfigType::Hostname => wrap(tiles::hostname_stream(connection.as_ref()), tile.update),
TileConfigType::Load => wrap(tiles::load_stream(), tile.update.or(Some(five_secs))), TileConfigType::Load => wrap(tiles::load_stream(), tile.update.or(Some(five_secs))),
TileConfigType::Memory => wrap(tiles::memory_stream(), tile.update.or(Some(five_secs))), TileConfigType::Memory => wrap(tiles::memory_stream(), tile.update.or(Some(five_secs))),
TileConfigType::Time(c) => wrap(tiles::time_stream(c.clone()), tile.update), TileConfigType::Time(c) => wrap(tiles::time_stream(c.clone()), tile.update),
} }
} }
fn wrap<'a, S>( fn wrap<'a, S>(stream: S, duration: Option<Duration>) -> BoxStream<'a, TileResult>
stream: S,
duration: Option<Duration>,
) -> BoxStream<'a, TileResult>
where where
S: Stream<Item = TileResult> + Send + 'a, S: Stream<Item = TileResult> + Send + 'a,
{ {

View file

@ -45,20 +45,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
fn spawn_stream<E: 'static>( fn spawn_stream<E: 'static>(
index: usize, index: usize,
stream: BoxStream<'static, Result<tile::Block, E>>, stream: BoxStream<'static, Result<tile::Block, E>>,
sender: Sender<tile::TileData>, sender: Sender<Result<tile::TileData, E>>,
) where ) where
E: Debug, E: Debug + Send,
{ {
tokio::spawn(async move { tokio::spawn(async move {
let instance: Arc<str> = Uuid::new_v4().to_string().into(); let instance: Arc<str> = Uuid::new_v4().to_string().into();
let stream = stream.map(|block| { let stream = stream.map(|block: Result<_, _>| {
Ok(tile::TileData { Ok(block.map(|block| tile::TileData {
block: tile::Block { block: tile::Block {
instance: instance.clone(), instance: instance.clone(),
..block.unwrap() ..block
}, },
sender_id: index, sender_id: index,
}) }))
}); });
let future = stream.forward(sender); let future = stream.forward(sender);
future.await future.await

View file

@ -5,11 +5,14 @@ use futures::StreamExt;
use std::convert::Infallible; use std::convert::Infallible;
use tokio::io::{self, AsyncWriteExt}; use tokio::io::{self, AsyncWriteExt};
pub async fn launch( pub async fn launch<E>(
num_tiles: usize, num_tiles: usize,
mut receiver: Receiver<TileData>, mut receiver: Receiver<Result<TileData, E>>,
_default: DefaultSection, _default: DefaultSection,
) -> io::Result<Infallible> { ) -> io::Result<Infallible>
where
E: Send + std::fmt::Debug,
{
let mut stdout = io::stdout(); let mut stdout = io::stdout();
stdout.write_all(b"{ \"version\": 1 }\n[").await?; stdout.write_all(b"{ \"version\": 1 }\n[").await?;
@ -17,6 +20,7 @@ pub async fn launch(
blocks.resize_with(num_tiles, Default::default); blocks.resize_with(num_tiles, Default::default);
loop { loop {
let message = receiver.next().await.unwrap(); let message = receiver.next().await.unwrap();
let message = message.unwrap();
if message.sender_id < num_tiles { if message.sender_id < num_tiles {
blocks[message.sender_id] = Some(message.block); blocks[message.sender_id] = Some(message.block);
} else { } else {

View file

@ -3,10 +3,9 @@ use crate::tiles::TileResult;
use dbus::nonblock::stdintf::org_freedesktop_dbus::Properties; use dbus::nonblock::stdintf::org_freedesktop_dbus::Properties;
use dbus::nonblock::{Proxy, SyncConnection}; use dbus::nonblock::{Proxy, SyncConnection};
use futures::{FutureExt, Stream}; use futures::{FutureExt, Stream};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
pub fn hostname_stream(connection: Arc<SyncConnection>) -> impl Stream<Item = TileResult> { pub fn hostname_stream(connection: &SyncConnection) -> impl Stream<Item = TileResult> {
let proxy = Proxy::new( let proxy = Proxy::new(
"org.freedesktop.hostname1", "org.freedesktop.hostname1",
"/org/freedesktop/hostname1", "/org/freedesktop/hostname1",