Replace mpsc with stream map of watch channels

This commit is contained in:
Skye 2023-12-16 19:19:44 -05:00
parent 381df2e644
commit e4da84bbd4
6 changed files with 31 additions and 30 deletions

1
Cargo.lock generated
View file

@ -1337,6 +1337,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]

View file

@ -24,7 +24,7 @@ serde_json = "1.0"
smart-default = "0.7" smart-default = "0.7"
structopt = "0.3" structopt = "0.3"
tokio = { version = "1", features = ["fs", "io-std", "io-util", "time", "rt", "macros", "rt-multi-thread"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "time", "rt", "macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1", features = ["time"]} tokio-stream = { version = "0.1", features = ["sync", "time"]}
toml = "0.8" toml = "0.8"
uuid = { version = "1.4", features = [ "v4" ] } uuid = { version = "1.4", features = [ "v4" ] }

View file

@ -7,6 +7,7 @@ use futures::FutureExt;
use log::error; use log::error;
use serde::{Deserialize, Deserializer}; use serde::{Deserialize, Deserializer};
use smart_default::SmartDefault; use smart_default::SmartDefault;
use tokio::sync::watch;
use std::convert::Infallible; use std::convert::Infallible;
use std::env::var; use std::env::var;
@ -18,7 +19,6 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
#[derive(Deserialize, Clone, Debug, Default)] #[derive(Deserialize, Clone, Debug, Default)]
@ -218,11 +218,11 @@ where
pub fn launch_tile( pub fn launch_tile(
tile: TileConfig, tile: TileConfig,
sender: mpsc::Sender<TileData>, sender: watch::Sender<TileData>,
tile_id: usize, tile_id: usize,
dbus_conn: &Arc<SyncConnection>, dbus_conn: &Arc<SyncConnection>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let output_chan = OutputChannel::with_random_uuid(sender.clone(), tile_id); let output_chan = OutputChannel::with_random_uuid(sender);
match tile.clone() { match tile.clone() {
TileConfig::Battery(c) => spawn(tiles::battery(c, output_chan), tile_id), TileConfig::Battery(c) => spawn(tiles::battery(c, output_chan), tile_id),
TileConfig::Hostname(c) => { TileConfig::Hostname(c) => {

View file

@ -6,7 +6,9 @@ mod tiles;
use dbus_tokio::connection::new_system_sync; use dbus_tokio::connection::new_system_sync;
use tokio::sync::mpsc; use tile::TileData;
use tokio::sync::watch;
use tokio_stream::{wrappers::WatchStream, StreamMap};
#[tokio::main] #[tokio::main]
async fn main() -> eyre::Result<()> { async fn main() -> eyre::Result<()> {
@ -20,18 +22,20 @@ async fn main() -> eyre::Result<()> {
panic!("Lost connection to D-Bus: {}", err); panic!("Lost connection to D-Bus: {}", err);
}); });
let (sender, receiver) = mpsc::channel(1024); let mut stream_map = StreamMap::new();
let tiles: Vec<_> = config let tiles: Vec<_> = config
.tile .tile
.drain(..) .drain(..)
.enumerate() .enumerate()
.map(|(sender_id, tile)| config::launch_tile(tile, sender.clone(), sender_id, &dbus_conn)) .map(|(sender_id, tile)| {
let (sender, receiver) = watch::channel(TileData { block: None });
stream_map.insert(sender_id, WatchStream::from(receiver));
config::launch_tile(tile, sender, sender_id, &dbus_conn)
})
.collect(); .collect();
let num_tiles = tiles.len(); let num_tiles = tiles.len();
drop(sender); output::run(num_tiles, stream_map).await
output::run(num_tiles, receiver).await
} }

View file

@ -4,53 +4,50 @@ use serde::{ser::SerializeSeq, Serialize};
use std::sync::Arc; use std::sync::Arc;
use tokio::{ use tokio::{
io::{self, AsyncWriteExt}, io::{self, AsyncWriteExt},
sync::mpsc, sync::watch,
}; };
use tokio_stream::{wrappers::WatchStream, StreamExt, StreamMap};
use uuid::Uuid; use uuid::Uuid;
pub struct OutputChannel { pub struct OutputChannel {
sender: mpsc::Sender<TileData>, sender: watch::Sender<TileData>,
sender_id: usize,
instance: Arc<str>, instance: Arc<str>,
} }
impl OutputChannel { impl OutputChannel {
pub fn with_random_uuid(sender: mpsc::Sender<TileData>, sender_id: usize) -> Self { pub fn with_random_uuid(sender: watch::Sender<TileData>) -> Self {
Self { Self {
sender, sender,
sender_id,
instance: Uuid::new_v4().to_string().into(), instance: Uuid::new_v4().to_string().into(),
} }
} }
pub async fn send(&self, mut block: Block) -> Result<(), mpsc::error::SendError<TileData>> { pub async fn send(&self, mut block: Block) -> Result<(), watch::error::SendError<TileData>> {
block.instance = self.instance.clone(); block.instance = self.instance.clone();
self.sender self.sender.send(TileData { block: Some(block) })
.send(TileData {
block,
sender_id: self.sender_id,
})
.await
} }
} }
pub async fn run(num_tiles: usize, mut receiver: mpsc::Receiver<TileData>) -> eyre::Result<()> { pub async fn run(
num_tiles: usize,
mut receiver: StreamMap<usize, WatchStream<TileData>>,
) -> eyre::Result<()> {
let mut stdout = io::stdout(); let mut stdout = io::stdout();
stdout.write_all(b"{ \"version\": 1 }\n[").await?; stdout.write_all(b"{ \"version\": 1 }\n[").await?;
let mut blocks = Vec::new(); let mut blocks = Vec::new();
blocks.resize_with(num_tiles, Default::default); blocks.resize_with(num_tiles, Default::default);
loop { loop {
let message = receiver let (sender_id, message) = receiver
.recv() .next()
.await .await
.ok_or_eyre("No more messages to recieve")?; .ok_or_eyre("No more messages to recieve")?;
let Some(block) = blocks.get_mut(message.sender_id) else { let Some(block) = blocks.get_mut(sender_id) else {
eprintln!("Invalid message with sender id {}", message.sender_id); eprintln!("Invalid message with sender id {}", sender_id);
continue; continue;
}; };
*block = Some(message.block); *block = message.block;
let mut serialized = serde_json::to_vec(&NoneSkipper(&blocks)).unwrap(); let mut serialized = serde_json::to_vec(&NoneSkipper(&blocks)).unwrap();
serialized.extend_from_slice(b",\n"); serialized.extend_from_slice(b",\n");
stdout.write_all(&serialized).await?; stdout.write_all(&serialized).await?;

View file

@ -95,6 +95,5 @@ pub struct Block {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct TileData { pub struct TileData {
pub sender_id: usize, pub block: Option<Block>,
pub block: Block,
} }