From e4da84bbd4e6ca25991f69e16185f878a7eccd18 Mon Sep 17 00:00:00 2001 From: Skye Date: Sat, 16 Dec 2023 19:19:44 -0500 Subject: [PATCH] Replace mpsc with stream map of watch channels --- Cargo.lock | 1 + Cargo.toml | 2 +- src/config.rs | 6 +++--- src/main.rs | 16 ++++++++++------ src/output.rs | 33 +++++++++++++++------------------ src/tile.rs | 3 +-- 6 files changed, 31 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6063021..e74511f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1337,6 +1337,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 59bcb6b..aac876c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ serde_json = "1.0" smart-default = "0.7" structopt = "0.3" tokio = { version = "1", features = ["fs", "io-std", "io-util", "time", "rt", "macros", "rt-multi-thread"] } -tokio-stream = { version = "0.1", features = ["time"]} +tokio-stream = { version = "0.1", features = ["sync", "time"]} toml = "0.8" uuid = { version = "1.4", features = [ "v4" ] } diff --git a/src/config.rs b/src/config.rs index fc47240..7373a2d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use log::error; use serde::{Deserialize, Deserializer}; use smart_default::SmartDefault; +use tokio::sync::watch; use std::convert::Infallible; use std::env::var; @@ -18,7 +19,6 @@ use std::sync::Arc; use std::time::Duration; use structopt::StructOpt; -use tokio::sync::mpsc; use tokio::task::JoinHandle; #[derive(Deserialize, Clone, Debug, Default)] @@ -218,11 +218,11 @@ where pub fn launch_tile( tile: TileConfig, - sender: mpsc::Sender, + sender: watch::Sender, tile_id: usize, dbus_conn: &Arc, ) -> JoinHandle<()> { - let output_chan = OutputChannel::with_random_uuid(sender.clone(), tile_id); + let output_chan = OutputChannel::with_random_uuid(sender); match tile.clone() { TileConfig::Battery(c) => spawn(tiles::battery(c, output_chan), tile_id), TileConfig::Hostname(c) => { diff --git a/src/main.rs b/src/main.rs index b862f13..820d42b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,9 @@ mod tiles; use dbus_tokio::connection::new_system_sync; -use tokio::sync::mpsc; +use tile::TileData; +use tokio::sync::watch; +use tokio_stream::{wrappers::WatchStream, StreamMap}; #[tokio::main] async fn main() -> eyre::Result<()> { @@ -20,18 +22,20 @@ async fn main() -> eyre::Result<()> { panic!("Lost connection to D-Bus: {}", err); }); - let (sender, receiver) = mpsc::channel(1024); + let mut stream_map = StreamMap::new(); let tiles: Vec<_> = config .tile .drain(..) .enumerate() - .map(|(sender_id, tile)| config::launch_tile(tile, sender.clone(), sender_id, &dbus_conn)) + .map(|(sender_id, tile)| { + let (sender, receiver) = watch::channel(TileData { block: None }); + stream_map.insert(sender_id, WatchStream::from(receiver)); + config::launch_tile(tile, sender, sender_id, &dbus_conn) + }) .collect(); let num_tiles = tiles.len(); - drop(sender); - - output::run(num_tiles, receiver).await + output::run(num_tiles, stream_map).await } diff --git a/src/output.rs b/src/output.rs index 090f305..5b37a71 100644 --- a/src/output.rs +++ b/src/output.rs @@ -4,53 +4,50 @@ use serde::{ser::SerializeSeq, Serialize}; use std::sync::Arc; use tokio::{ io::{self, AsyncWriteExt}, - sync::mpsc, + sync::watch, }; +use tokio_stream::{wrappers::WatchStream, StreamExt, StreamMap}; use uuid::Uuid; pub struct OutputChannel { - sender: mpsc::Sender, - sender_id: usize, + sender: watch::Sender, instance: Arc, } impl OutputChannel { - pub fn with_random_uuid(sender: mpsc::Sender, sender_id: usize) -> Self { + pub fn with_random_uuid(sender: watch::Sender) -> Self { Self { sender, - sender_id, instance: Uuid::new_v4().to_string().into(), } } - pub async fn send(&self, mut block: Block) -> Result<(), mpsc::error::SendError> { + pub async fn send(&self, mut block: Block) -> Result<(), watch::error::SendError> { block.instance = self.instance.clone(); - self.sender - .send(TileData { - block, - sender_id: self.sender_id, - }) - .await + self.sender.send(TileData { block: Some(block) }) } } -pub async fn run(num_tiles: usize, mut receiver: mpsc::Receiver) -> eyre::Result<()> { +pub async fn run( + num_tiles: usize, + mut receiver: StreamMap>, +) -> eyre::Result<()> { let mut stdout = io::stdout(); stdout.write_all(b"{ \"version\": 1 }\n[").await?; let mut blocks = Vec::new(); blocks.resize_with(num_tiles, Default::default); loop { - let message = receiver - .recv() + let (sender_id, message) = receiver + .next() .await .ok_or_eyre("No more messages to recieve")?; - let Some(block) = blocks.get_mut(message.sender_id) else { - eprintln!("Invalid message with sender id {}", message.sender_id); + let Some(block) = blocks.get_mut(sender_id) else { + eprintln!("Invalid message with sender id {}", sender_id); continue; }; - *block = Some(message.block); + *block = message.block; let mut serialized = serde_json::to_vec(&NoneSkipper(&blocks)).unwrap(); serialized.extend_from_slice(b",\n"); stdout.write_all(&serialized).await?; diff --git a/src/tile.rs b/src/tile.rs index 3330a79..9ea4e67 100644 --- a/src/tile.rs +++ b/src/tile.rs @@ -95,6 +95,5 @@ pub struct Block { #[derive(Clone, Debug)] pub struct TileData { - pub sender_id: usize, - pub block: Block, + pub block: Option, }