From 6b9a5e1609698ebd64fef4a3bec44cd4eca93f75 Mon Sep 17 00:00:00 2001 From: Skye Jensen Date: Thu, 4 Jun 2020 21:04:06 -0400 Subject: [PATCH] Use stream implementations of the tiles --- src/config.rs | 17 +++--- src/main.rs | 42 +++++---------- src/tile.rs | 59 +------------------- src/tiles/hostname.rs | 39 ++------------ src/tiles/load.rs | 42 +-------------- src/tiles/memory.rs | 123 ++++++++++++------------------------------ src/tiles/mod.rs | 8 +-- src/tiles/time.rs | 74 +++---------------------- 8 files changed, 77 insertions(+), 327 deletions(-) diff --git a/src/config.rs b/src/config.rs index 943fe71..339b201 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,6 @@ -use crate::tile::TileModule; +use crate::tile::Block; use crate::tiles; +use futures::stream::BoxStream; use serde::Deserialize; use smart_default::SmartDefault; use std::env::var; @@ -8,6 +9,7 @@ use std::path::PathBuf; use structopt::StructOpt; use tokio::fs::File; use tokio::prelude::*; +use tokio::time; #[derive(Deserialize, Clone, Debug, Default)] #[serde(default)] @@ -82,11 +84,14 @@ pub async fn read_config() -> Result> { Ok(toml::from_slice(&config_contents)?) } -pub fn process_tile(tile: &TileConfig) -> Box { +pub fn process_tile( + tile: &TileConfig, +) -> BoxStream<'static, Result>> { + let five_secs = time::Duration::from_secs(5); match tile { - TileConfig::Load => Box::new(tiles::Hostname::new()), - TileConfig::Memory => Box::new(tiles::Memory::new()), - TileConfig::Hostname => Box::new(tiles::Hostname::new()), - TileConfig::Time(c) => Box::new(tiles::Time::from_config(c)), + TileConfig::Load => Box::pin(tiles::load_stream(time::interval(five_secs))), + TileConfig::Memory => Box::pin(tiles::memory_stream(time::interval(five_secs))), + TileConfig::Hostname => Box::pin(tiles::hostname_stream()), + TileConfig::Time(c) => Box::pin(tiles::time_stream(c.clone())), } } diff --git a/src/main.rs b/src/main.rs index 5ef8197..b55d438 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ -pub mod config; -pub mod output; -pub mod tile; -pub mod tiles; +mod config; +mod output; +mod tile; +mod tiles; use dbus_tokio::connection::new_session_sync; use futures::channel::mpsc::{channel, Sender}; -use futures::{Stream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use std::fmt::Debug; use std::sync::Arc; use uuid::Uuid; @@ -25,33 +25,15 @@ async fn main() -> Result<(), Box> { let (sender, receiver) = channel(1024); - let mut index = 0usize; - let wrap = |module| { - let tile = tile::Tile::new( - index, - sender.clone(), - Uuid::new_v4().to_string().into(), - module, - ); - index += 1; - tile - }; - - let tiles: Vec = config + let tiles: Vec<_> = config .tile .iter() .map(config::process_tile) - .map(wrap) + .enumerate() + .map(|(index, stream)| spawn_stream(index, stream, sender.clone())) .collect(); let num_tiles = tiles.len(); - for tile in tiles.into_iter() { - tile.spawn(); - } - // let format = "%Y-%m-%d %H:%M:%S".into(); - // let short_format = "%H:%M:%S".into(); - // let stream = tiles::time::time_stream(format, short_format); - // spawn_stream(4, stream, sender.clone()); drop(sender); @@ -59,9 +41,11 @@ async fn main() -> Result<(), Box> { } #[allow(unused)] -fn spawn_stream(index: usize, stream: S, sender: Sender) -where - S: Stream> + Send + 'static, +fn spawn_stream( + index: usize, + stream: BoxStream<'static, Result>, + sender: Sender, +) where E: Debug, { tokio::spawn(async move { diff --git a/src/tile.rs b/src/tile.rs index d5833ab..d277ab6 100644 --- a/src/tile.rs +++ b/src/tile.rs @@ -1,13 +1,10 @@ -use async_trait::async_trait; -use futures::channel::mpsc::{SendError, Sender}; -use futures::SinkExt; use serde::{ser::Serializer, Serialize}; use smart_default::SmartDefault; use std::fmt::Debug; use std::sync::Arc; //use tokio::sync::mpsc::{error::SendError, Sender}; -use tokio::task::JoinHandle; +#[allow(unused)] #[derive(Copy, Clone, Debug, Serialize)] #[serde(rename_all = "lowercase")] pub enum Alignment { @@ -22,6 +19,7 @@ impl Default for Alignment { } } +#[allow(unused)] #[derive(Copy, Clone, Debug, Serialize)] #[serde(rename_all = "lowercase")] pub enum Markup { @@ -84,56 +82,3 @@ pub struct TileData { pub sender_id: usize, pub block: Block, } - -#[async_trait] -pub trait TileModule: Send + std::fmt::Debug { - async fn run( - &mut self, - sender: &mut BlockSender, - ) -> Result<(), Box>; -} - -#[derive(Debug)] -pub struct BlockSender { - sender_id: usize, - sender: Sender, - instance: Arc, -} - -impl BlockSender { - pub async fn send(&mut self, mut block: Block) -> Result<(), SendError> { - block.instance = self.instance.clone(); - let data = TileData { - block, - sender_id: self.sender_id, - }; - self.sender.send(data).await - } -} - -#[derive(Debug)] -pub struct Tile { - sender: BlockSender, - module: Box, -} - -impl Tile { - pub fn new( - sender_id: usize, - sender: Sender, - instance: Arc, - module: Box, - ) -> Self { - Tile { - sender: BlockSender { - sender_id, - sender, - instance, - }, - module, - } - } - pub fn spawn(mut self) -> JoinHandle>> { - tokio::spawn(async move { self.module.run(&mut self.sender).await }) - } -} diff --git a/src/tiles/hostname.rs b/src/tiles/hostname.rs index 109b10f..5b0e9fa 100644 --- a/src/tiles/hostname.rs +++ b/src/tiles/hostname.rs @@ -1,44 +1,11 @@ -use crate::tile::{Block, BlockSender, TileModule}; -use async_trait::async_trait; +use crate::tile::Block; use futures::stream; use futures::Stream; use tokio::fs::File; use tokio::prelude::*; -#[derive(Debug, Default)] -pub struct Hostname; - -impl Hostname { - pub fn new() -> Hostname { - Hostname - } -} - -#[async_trait] -impl TileModule for Hostname { - async fn run( - &mut self, - sender: &mut BlockSender, - ) -> Result<(), Box> { - let mut raw = String::new(); - File::open("/proc/sys/kernel/hostname") - .await? - .read_to_string(&mut raw) - .await?; - let block = Block { - full_text: raw.trim_end_matches('\n').into(), - name: "hostname".into(), - ..Default::default() - }; - sender.send(block).await?; - // What's the hostname gonna do? Change? - Ok(()) - } -} - -#[allow(unused)] -fn hostname_stream() -> impl Stream>> -{ +pub fn hostname_stream( +) -> impl Stream>> { stream::once(async { let mut raw = String::new(); File::open("/proc/sys/kernel/hostname") diff --git a/src/tiles/load.rs b/src/tiles/load.rs index 860c7b4..e7d686b 100644 --- a/src/tiles/load.rs +++ b/src/tiles/load.rs @@ -1,48 +1,10 @@ -use crate::tile::{Block, BlockSender, TileModule}; -use async_trait::async_trait; +use crate::tile::Block; use futures::stream::StreamExt; -use std::time::Duration; use tokio::fs::File; use tokio::prelude::*; use tokio::stream::Stream; -use tokio::time::interval; -#[derive(Debug, Default)] -pub struct Load; - -impl Load { - pub fn new() -> Self { - Load - } -} - -#[async_trait] -impl TileModule for Load { - async fn run( - &mut self, - sender: &mut BlockSender, - ) -> Result<(), Box> { - let mut timer = interval(Duration::from_secs(5)); - loop { - timer.tick().await; - let mut raw = String::new(); - File::open("/proc/loadavg") - .await? - .read_to_string(&mut raw) - .await?; - let (load, _rest) = raw.split_at(raw.find(' ').unwrap_or(0)); - let block = Block { - full_text: load.into(), - name: "load".into(), - ..Default::default() - }; - sender.send(block).await?; - } - } -} - -#[allow(unused)] -fn load_stream( +pub fn load_stream( clock: T, ) -> impl Stream>> where diff --git a/src/tiles/memory.rs b/src/tiles/memory.rs index 75f9f3d..d34087a 100644 --- a/src/tiles/memory.rs +++ b/src/tiles/memory.rs @@ -1,102 +1,47 @@ -use crate::tile::{Block, BlockSender, TileModule}; -use async_trait::async_trait; +use crate::tile::Block; use futures::{Stream, StreamExt}; use std::io; use std::str; -use std::time::Duration; use tokio::fs::File; use tokio::prelude::*; -use tokio::time::interval; -#[derive(Debug, Default)] -pub struct Memory; - -impl Memory { - pub fn new() -> Memory { - Memory +fn prettify_kib(kib: u64) -> Box { + if kib > u64::MAX / 1024 { + panic!("Too much memory"); } - - fn prettify_kib(kib: u64) -> Box { - if kib > u64::MAX / 1024 { - panic!("Too much memory"); + let mut mem = kib; + let mut stages = 0u8; + while mem >= 1024 { + stages += 1; + mem /= 1024; + } + format!( + "{} {}iB", + mem, + match stages { + 0 => 'k', + 1 => 'M', + 2 => 'G', + 3 => 'T', + 4 => 'P', + 5 => 'E', + 6 => 'Z', + _ => panic!("Too much memory, for real this time"), } - let mut mem = kib; - let mut stages = 0u8; - while mem >= 1024 { - stages += 1; - mem /= 1024; - } - format!( - "{} {}iB", - mem, - match stages { - 0 => 'k', - 1 => 'M', - 2 => 'G', - 3 => 'T', - 4 => 'P', - 5 => 'E', - 6 => 'Z', - _ => panic!("Too much memory, for real this time"), - } - ) - .into_boxed_str() - } - - fn extract_value(line: &str) -> Result> { - let mut parts = line.split_whitespace(); - parts.next(); - Ok(parts - .next() - .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))? - .parse()?) - } + ) + .into_boxed_str() } -#[async_trait] -impl TileModule for Memory { - async fn run( - &mut self, - sender: &mut BlockSender, - ) -> Result<(), Box> { - let mut timer = interval(Duration::from_secs(5)); - let mut raw = [0u8; 256]; - loop { - timer.tick().await; - File::open("/proc/meminfo") - .await? - .read_exact(&mut raw) - .await?; - let string_data = str::from_utf8(&raw)?; - let mut lines = string_data.split('\n'); - let mem_total = Memory::prettify_kib(Memory::extract_value( - lines - .next() - .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, - )?); - lines.next(); - let mem_avail = Memory::prettify_kib(Memory::extract_value( - lines - .next() - .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, - )?); - - let full_text = format!("{} avail / {}", mem_avail, mem_total).into_boxed_str(); - let short_text = format!("{} / {}", mem_avail, mem_total).into_boxed_str(); - - let block = Block { - full_text, - short_text: Some(short_text), - name: "memory".into(), - ..Default::default() - }; - sender.send(block).await?; - } - } +fn extract_value(line: &str) -> Result> { + let mut parts = line.split_whitespace(); + parts.next(); + Ok(parts + .next() + .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))? + .parse()?) } -#[allow(unused)] -fn memory_stream( +pub fn memory_stream( clock: T, ) -> impl Stream>> where @@ -110,13 +55,13 @@ where .await?; let string_data = str::from_utf8(&raw)?; let mut lines = string_data.split('\n'); - let mem_total = Memory::prettify_kib(Memory::extract_value( + let mem_total = prettify_kib(extract_value( lines .next() .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, )?); lines.next(); - let mem_avail = Memory::prettify_kib(Memory::extract_value( + let mem_avail = prettify_kib(extract_value( lines .next() .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, diff --git a/src/tiles/mod.rs b/src/tiles/mod.rs index 4affbe0..13a4c8c 100644 --- a/src/tiles/mod.rs +++ b/src/tiles/mod.rs @@ -2,7 +2,7 @@ pub mod hostname; pub mod load; pub mod memory; pub mod time; -pub use hostname::Hostname; -pub use load::Load; -pub use memory::Memory; -pub use time::Time; +pub use hostname::hostname_stream; +pub use load::load_stream; +pub use memory::memory_stream; +pub use time::time_stream; diff --git a/src/tiles/time.rs b/src/tiles/time.rs index 1371f90..686aa25 100644 --- a/src/tiles/time.rs +++ b/src/tiles/time.rs @@ -1,86 +1,28 @@ use crate::config::TimeConfig; -use crate::tile::{Block, BlockSender, TileModule}; -use async_trait::async_trait; +use crate::tile::Block; use chrono::prelude::*; use chrono::DateTime; use futures::future::Future; use futures::stream::Stream; use futures_util::ready; use pin_project::pin_project; +use std::error::Error; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; use tokio::time::{delay_for, delay_until, Delay, Instant}; -#[derive(Debug)] -pub struct Time { - format: Box, - short_format: Box, -} - -impl Time { - pub fn new() -> Time { - Default::default() - } - - pub fn from_config(config: &TimeConfig) -> Time { - Time { - format: config.format.clone(), - short_format: config.short_format.clone(), - } - } - - fn send_time(&mut self, time: DateTime) -> Block { - Block { - full_text: time.format(&self.format).to_string().into(), - short_text: Some(time.format(&self.short_format).to_string().into()), - name: "time".into(), - ..Default::default() - } - } -} - -impl Default for Time { - fn default() -> Self { - Time { - format: "%Y-%m-%d %H:%M:%S".into(), - short_format: "%H:%M:%S".into(), - } - } -} - -#[async_trait] -impl TileModule for Time { - async fn run( - &mut self, - sender: &mut BlockSender, - ) -> Result<(), Box> { - let mut time = Local::now(); - loop { - sender.send(self.send_time(time)).await?; - time = Local::now(); - let millis_part = time.naive_local().timestamp_subsec_millis() as u64; - let delay_ms = 1000u64 - millis_part % 1000; // Don't crash if we hit a leap second - delay_for(Duration::from_millis(delay_ms)).await; - } - } -} - pub fn time_stream( - format: Box, - short_format: Box, + config: TimeConfig, ) -> impl Stream>> { TimeStream { - format, - short_format, + config, delay: delay_until(Instant::now()), } } #[pin_project] struct TimeStream { - format: Box, - short_format: Box, + config: TimeConfig, #[pin] delay: Delay, } @@ -88,8 +30,8 @@ struct TimeStream { impl TimeStream { fn send_time(&self, time: DateTime) -> Block { Block { - full_text: time.format(&self.format).to_string().into(), - short_text: Some(time.format(&self.short_format).to_string().into()), + full_text: time.format(&self.config.format).to_string().into(), + short_text: Some(time.format(&self.config.short_format).to_string().into()), name: "time".into(), ..Default::default() } @@ -104,7 +46,7 @@ impl TimeStream { } impl Stream for TimeStream { - type Item = Result>; + type Item = Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let project = Pin::as_mut(&mut self).project(); ready!(Future::poll(project.delay, cx));