From 68f2d4e3f1439c0fe8ade7dfdf51cbd109c1bc7e Mon Sep 17 00:00:00 2001 From: Skye Jensen Date: Thu, 4 Jun 2020 19:47:16 -0400 Subject: [PATCH] Add stream implementations of tiles --- Cargo.lock | 3 +++ Cargo.toml | 3 +++ src/main.rs | 33 +++++++++++++++++++++++- src/output.rs | 5 ++-- src/tile.rs | 8 +++--- src/tiles/hostname.rs | 20 +++++++++++++++ src/tiles/load.rs | 24 +++++++++++++++++ src/tiles/memory.rs | 40 +++++++++++++++++++++++++++++ src/tiles/time.rs | 60 ++++++++++++++++++++++++++++++++++++++++++- 9 files changed, 189 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5009b62..6bdf43f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,9 @@ dependencies = [ "chrono", "dbus", "dbus-tokio", + "futures", + "futures-util", + "pin-project", "serde", "serde_json", "smart-default", diff --git a/Cargo.toml b/Cargo.toml index 006f0dd..19e0c58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,9 @@ async-trait = "0.1" chrono = "0.4" dbus = "0.8" dbus-tokio = "0.5" +futures = "0.3" +futures-util = "0.3" +pin-project = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" smart-default = "0.6" diff --git a/src/main.rs b/src/main.rs index e92eced..5ef8197 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,10 @@ pub mod tile; pub mod tiles; use dbus_tokio::connection::new_session_sync; -use tokio::sync::mpsc::channel; +use futures::channel::mpsc::{channel, Sender}; +use futures::{Stream, StreamExt}; +use std::fmt::Debug; +use std::sync::Arc; use uuid::Uuid; #[tokio::main] @@ -45,6 +48,34 @@ async fn main() -> Result<(), Box> { for tile in tiles.into_iter() { tile.spawn(); } + // let format = "%Y-%m-%d %H:%M:%S".into(); + // let short_format = "%H:%M:%S".into(); + // let stream = tiles::time::time_stream(format, short_format); + // spawn_stream(4, stream, sender.clone()); + + drop(sender); match output::launch(num_tiles, receiver, config.default).await? {} } + +#[allow(unused)] +fn spawn_stream(index: usize, stream: S, sender: Sender) +where + S: Stream> + Send + 'static, + E: Debug, +{ + tokio::spawn(async move { + let instance: Arc = Uuid::new_v4().to_string().into(); + let stream = stream.map(|block| { + Ok(tile::TileData { + block: tile::Block { + instance: instance.clone(), + ..block.unwrap() + }, + sender_id: index, + }) + }); + let future = stream.forward(sender); + future.await + }); +} diff --git a/src/output.rs b/src/output.rs index 0145896..cd29b0b 100644 --- a/src/output.rs +++ b/src/output.rs @@ -1,8 +1,9 @@ use crate::config::DefaultSection; use crate::tile::TileData; +use futures::channel::mpsc::Receiver; +use futures::StreamExt; use std::convert::Infallible; use tokio::io::{self, AsyncWriteExt}; -use tokio::sync::mpsc::Receiver; pub async fn launch( num_tiles: usize, @@ -15,7 +16,7 @@ pub async fn launch( let mut blocks = Vec::new(); blocks.resize_with(num_tiles, Default::default); loop { - let message = receiver.recv().await.unwrap(); + let message = receiver.next().await.unwrap(); if message.sender_id < num_tiles { blocks[message.sender_id] = Some(message.block); } else { diff --git a/src/tile.rs b/src/tile.rs index 23225b4..d5833ab 100644 --- a/src/tile.rs +++ b/src/tile.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; +use futures::channel::mpsc::{SendError, Sender}; +use futures::SinkExt; use serde::{ser::Serializer, Serialize}; use smart_default::SmartDefault; use std::fmt::Debug; use std::sync::Arc; -use tokio::sync::mpsc::{error::SendError, Sender}; +//use tokio::sync::mpsc::{error::SendError, Sender}; use tokio::task::JoinHandle; #[derive(Copy, Clone, Debug, Serialize)] @@ -65,7 +67,7 @@ pub struct Block { pub align: Option, pub name: Box, #[serde(serialize_with = "arc_default")] - #[default = r#""".into()"#] + #[default = ""] pub instance: Arc, #[serde(skip_serializing_if = "Option::is_none")] pub urgent: Option, @@ -99,7 +101,7 @@ pub struct BlockSender { } impl BlockSender { - pub async fn send(&mut self, mut block: Block) -> Result<(), SendError> { + pub async fn send(&mut self, mut block: Block) -> Result<(), SendError> { block.instance = self.instance.clone(); let data = TileData { block, diff --git a/src/tiles/hostname.rs b/src/tiles/hostname.rs index d07670c..109b10f 100644 --- a/src/tiles/hostname.rs +++ b/src/tiles/hostname.rs @@ -1,5 +1,7 @@ use crate::tile::{Block, BlockSender, TileModule}; use async_trait::async_trait; +use futures::stream; +use futures::Stream; use tokio::fs::File; use tokio::prelude::*; @@ -33,3 +35,21 @@ impl TileModule for Hostname { Ok(()) } } + +#[allow(unused)] +fn hostname_stream() -> impl Stream>> +{ + stream::once(async { + let mut raw = String::new(); + File::open("/proc/sys/kernel/hostname") + .await? + .read_to_string(&mut raw) + .await?; + let block = Block { + full_text: raw.trim_end_matches('\n').into(), + name: "hostname".into(), + ..Default::default() + }; + Ok(block) + }) +} diff --git a/src/tiles/load.rs b/src/tiles/load.rs index 524fd31..860c7b4 100644 --- a/src/tiles/load.rs +++ b/src/tiles/load.rs @@ -1,8 +1,10 @@ use crate::tile::{Block, BlockSender, TileModule}; use async_trait::async_trait; +use futures::stream::StreamExt; use std::time::Duration; use tokio::fs::File; use tokio::prelude::*; +use tokio::stream::Stream; use tokio::time::interval; #[derive(Debug, Default)] @@ -38,3 +40,25 @@ impl TileModule for Load { } } } + +#[allow(unused)] +fn load_stream( + clock: T, +) -> impl Stream>> +where + T: Stream, +{ + clock.then(|_| async { + let mut raw = String::new(); + File::open("/proc/loadavg") + .await? + .read_to_string(&mut raw) + .await?; + let (load, _rest) = raw.split_at(raw.find(' ').unwrap_or(0)); + Ok(Block { + full_text: load.into(), + name: "load".into(), + ..Default::default() + }) + }) +} diff --git a/src/tiles/memory.rs b/src/tiles/memory.rs index f5ddea2..75f9f3d 100644 --- a/src/tiles/memory.rs +++ b/src/tiles/memory.rs @@ -1,5 +1,6 @@ use crate::tile::{Block, BlockSender, TileModule}; use async_trait::async_trait; +use futures::{Stream, StreamExt}; use std::io; use std::str; use std::time::Duration; @@ -93,3 +94,42 @@ impl TileModule for Memory { } } } + +#[allow(unused)] +fn memory_stream( + clock: T, +) -> impl Stream>> +where + T: Stream, +{ + clock.then(|_| async { + let mut raw = [0u8; 256]; + File::open("/proc/meminfo") + .await? + .read_exact(&mut raw) + .await?; + let string_data = str::from_utf8(&raw)?; + let mut lines = string_data.split('\n'); + let mem_total = Memory::prettify_kib(Memory::extract_value( + lines + .next() + .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, + )?); + lines.next(); + let mem_avail = Memory::prettify_kib(Memory::extract_value( + lines + .next() + .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, + )?); + + let full_text = format!("{} avail / {}", mem_avail, mem_total).into_boxed_str(); + let short_text = format!("{} / {}", mem_avail, mem_total).into_boxed_str(); + + Ok(Block { + full_text, + short_text: Some(short_text), + name: "memory".into(), + ..Default::default() + }) + }) +} diff --git a/src/tiles/time.rs b/src/tiles/time.rs index 773ffd1..1371f90 100644 --- a/src/tiles/time.rs +++ b/src/tiles/time.rs @@ -3,8 +3,14 @@ use crate::tile::{Block, BlockSender, TileModule}; use async_trait::async_trait; use chrono::prelude::*; use chrono::DateTime; +use futures::future::Future; +use futures::stream::Stream; +use futures_util::ready; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; -use tokio::time::delay_for; +use tokio::time::{delay_for, delay_until, Delay, Instant}; #[derive(Debug)] pub struct Time { @@ -59,3 +65,55 @@ impl TileModule for Time { } } } + +pub fn time_stream( + format: Box, + short_format: Box, +) -> impl Stream>> { + TimeStream { + format, + short_format, + delay: delay_until(Instant::now()), + } +} + +#[pin_project] +struct TimeStream { + format: Box, + short_format: Box, + #[pin] + delay: Delay, +} + +impl TimeStream { + fn send_time(&self, time: DateTime) -> Block { + Block { + full_text: time.format(&self.format).to_string().into(), + short_text: Some(time.format(&self.short_format).to_string().into()), + name: "time".into(), + ..Default::default() + } + } + + fn wait_for_next_second(now: DateTime) -> Delay { + let next = now.trunc_subsecs(0) + chrono::Duration::seconds(1); + let difference = next - now; + + delay_for(difference.to_std().unwrap()) + } +} + +impl Stream for TimeStream { + type Item = Result>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let project = Pin::as_mut(&mut self).project(); + ready!(Future::poll(project.delay, cx)); + + let now = Local::now(); + Pin::as_mut(&mut self) + .project() + .delay + .set(TimeStream::wait_for_next_second(now)); + Poll::Ready(Some(Ok(self.send_time(now)))) + } +}