Use stream implementations of the tiles

This commit is contained in:
Skye Jensen 2020-06-04 21:04:06 -04:00
parent 68f2d4e3f1
commit 6b9a5e1609
8 changed files with 77 additions and 327 deletions

View file

@ -1,5 +1,6 @@
use crate::tile::TileModule; use crate::tile::Block;
use crate::tiles; use crate::tiles;
use futures::stream::BoxStream;
use serde::Deserialize; use serde::Deserialize;
use smart_default::SmartDefault; use smart_default::SmartDefault;
use std::env::var; use std::env::var;
@ -8,6 +9,7 @@ use std::path::PathBuf;
use structopt::StructOpt; use structopt::StructOpt;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::time;
#[derive(Deserialize, Clone, Debug, Default)] #[derive(Deserialize, Clone, Debug, Default)]
#[serde(default)] #[serde(default)]
@ -82,11 +84,14 @@ pub async fn read_config() -> Result<Config, Box<dyn std::error::Error>> {
Ok(toml::from_slice(&config_contents)?) Ok(toml::from_slice(&config_contents)?)
} }
pub fn process_tile(tile: &TileConfig) -> Box<dyn TileModule> { pub fn process_tile(
tile: &TileConfig,
) -> BoxStream<'static, Result<Block, Box<dyn std::error::Error + Send + Sync>>> {
let five_secs = time::Duration::from_secs(5);
match tile { match tile {
TileConfig::Load => Box::new(tiles::Hostname::new()), TileConfig::Load => Box::pin(tiles::load_stream(time::interval(five_secs))),
TileConfig::Memory => Box::new(tiles::Memory::new()), TileConfig::Memory => Box::pin(tiles::memory_stream(time::interval(five_secs))),
TileConfig::Hostname => Box::new(tiles::Hostname::new()), TileConfig::Hostname => Box::pin(tiles::hostname_stream()),
TileConfig::Time(c) => Box::new(tiles::Time::from_config(c)), TileConfig::Time(c) => Box::pin(tiles::time_stream(c.clone())),
} }
} }

View file

@ -1,11 +1,11 @@
pub mod config; mod config;
pub mod output; mod output;
pub mod tile; mod tile;
pub mod tiles; mod tiles;
use dbus_tokio::connection::new_session_sync; use dbus_tokio::connection::new_session_sync;
use futures::channel::mpsc::{channel, Sender}; use futures::channel::mpsc::{channel, Sender};
use futures::{Stream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
@ -25,33 +25,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (sender, receiver) = channel(1024); let (sender, receiver) = channel(1024);
let mut index = 0usize; let tiles: Vec<_> = config
let wrap = |module| {
let tile = tile::Tile::new(
index,
sender.clone(),
Uuid::new_v4().to_string().into(),
module,
);
index += 1;
tile
};
let tiles: Vec<tile::Tile> = config
.tile .tile
.iter() .iter()
.map(config::process_tile) .map(config::process_tile)
.map(wrap) .enumerate()
.map(|(index, stream)| spawn_stream(index, stream, sender.clone()))
.collect(); .collect();
let num_tiles = tiles.len(); let num_tiles = tiles.len();
for tile in tiles.into_iter() {
tile.spawn();
}
// let format = "%Y-%m-%d %H:%M:%S".into();
// let short_format = "%H:%M:%S".into();
// let stream = tiles::time::time_stream(format, short_format);
// spawn_stream(4, stream, sender.clone());
drop(sender); drop(sender);
@ -59,9 +41,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
#[allow(unused)] #[allow(unused)]
fn spawn_stream<S, E>(index: usize, stream: S, sender: Sender<tile::TileData>) fn spawn_stream<E: 'static>(
where index: usize,
S: Stream<Item = Result<tile::Block, E>> + Send + 'static, stream: BoxStream<'static, Result<tile::Block, E>>,
sender: Sender<tile::TileData>,
) where
E: Debug, E: Debug,
{ {
tokio::spawn(async move { tokio::spawn(async move {

View file

@ -1,13 +1,10 @@
use async_trait::async_trait;
use futures::channel::mpsc::{SendError, Sender};
use futures::SinkExt;
use serde::{ser::Serializer, Serialize}; use serde::{ser::Serializer, Serialize};
use smart_default::SmartDefault; use smart_default::SmartDefault;
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
//use tokio::sync::mpsc::{error::SendError, Sender}; //use tokio::sync::mpsc::{error::SendError, Sender};
use tokio::task::JoinHandle;
#[allow(unused)]
#[derive(Copy, Clone, Debug, Serialize)] #[derive(Copy, Clone, Debug, Serialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Alignment { pub enum Alignment {
@ -22,6 +19,7 @@ impl Default for Alignment {
} }
} }
#[allow(unused)]
#[derive(Copy, Clone, Debug, Serialize)] #[derive(Copy, Clone, Debug, Serialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum Markup { pub enum Markup {
@ -84,56 +82,3 @@ pub struct TileData {
pub sender_id: usize, pub sender_id: usize,
pub block: Block, pub block: Block,
} }
#[async_trait]
pub trait TileModule: Send + std::fmt::Debug {
async fn run(
&mut self,
sender: &mut BlockSender,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
#[derive(Debug)]
pub struct BlockSender {
sender_id: usize,
sender: Sender<TileData>,
instance: Arc<str>,
}
impl BlockSender {
pub async fn send(&mut self, mut block: Block) -> Result<(), SendError> {
block.instance = self.instance.clone();
let data = TileData {
block,
sender_id: self.sender_id,
};
self.sender.send(data).await
}
}
#[derive(Debug)]
pub struct Tile {
sender: BlockSender,
module: Box<dyn TileModule>,
}
impl Tile {
pub fn new(
sender_id: usize,
sender: Sender<TileData>,
instance: Arc<str>,
module: Box<dyn TileModule>,
) -> Self {
Tile {
sender: BlockSender {
sender_id,
sender,
instance,
},
module,
}
}
pub fn spawn(mut self) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move { self.module.run(&mut self.sender).await })
}
}

View file

@ -1,44 +1,11 @@
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::Block;
use async_trait::async_trait;
use futures::stream; use futures::stream;
use futures::Stream; use futures::Stream;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
#[derive(Debug, Default)] pub fn hostname_stream(
pub struct Hostname; ) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>> {
impl Hostname {
pub fn new() -> Hostname {
Hostname
}
}
#[async_trait]
impl TileModule for Hostname {
async fn run(
&mut self,
sender: &mut BlockSender,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut raw = String::new();
File::open("/proc/sys/kernel/hostname")
.await?
.read_to_string(&mut raw)
.await?;
let block = Block {
full_text: raw.trim_end_matches('\n').into(),
name: "hostname".into(),
..Default::default()
};
sender.send(block).await?;
// What's the hostname gonna do? Change?
Ok(())
}
}
#[allow(unused)]
fn hostname_stream() -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
{
stream::once(async { stream::once(async {
let mut raw = String::new(); let mut raw = String::new();
File::open("/proc/sys/kernel/hostname") File::open("/proc/sys/kernel/hostname")

View file

@ -1,48 +1,10 @@
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::Block;
use async_trait::async_trait;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use std::time::Duration;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::stream::Stream; use tokio::stream::Stream;
use tokio::time::interval;
#[derive(Debug, Default)] pub fn load_stream<T>(
pub struct Load;
impl Load {
pub fn new() -> Self {
Load
}
}
#[async_trait]
impl TileModule for Load {
async fn run(
&mut self,
sender: &mut BlockSender,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut timer = interval(Duration::from_secs(5));
loop {
timer.tick().await;
let mut raw = String::new();
File::open("/proc/loadavg")
.await?
.read_to_string(&mut raw)
.await?;
let (load, _rest) = raw.split_at(raw.find(' ').unwrap_or(0));
let block = Block {
full_text: load.into(),
name: "load".into(),
..Default::default()
};
sender.send(block).await?;
}
}
}
#[allow(unused)]
fn load_stream<T>(
clock: T, clock: T,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>> ) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
where where

View file

@ -1,102 +1,47 @@
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::Block;
use async_trait::async_trait;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use std::io; use std::io;
use std::str; use std::str;
use std::time::Duration;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::time::interval;
#[derive(Debug, Default)] fn prettify_kib(kib: u64) -> Box<str> {
pub struct Memory; if kib > u64::MAX / 1024 {
panic!("Too much memory");
impl Memory {
pub fn new() -> Memory {
Memory
} }
let mut mem = kib;
fn prettify_kib(kib: u64) -> Box<str> { let mut stages = 0u8;
if kib > u64::MAX / 1024 { while mem >= 1024 {
panic!("Too much memory"); stages += 1;
mem /= 1024;
}
format!(
"{} {}iB",
mem,
match stages {
0 => 'k',
1 => 'M',
2 => 'G',
3 => 'T',
4 => 'P',
5 => 'E',
6 => 'Z',
_ => panic!("Too much memory, for real this time"),
} }
let mut mem = kib; )
let mut stages = 0u8; .into_boxed_str()
while mem >= 1024 {
stages += 1;
mem /= 1024;
}
format!(
"{} {}iB",
mem,
match stages {
0 => 'k',
1 => 'M',
2 => 'G',
3 => 'T',
4 => 'P',
5 => 'E',
6 => 'Z',
_ => panic!("Too much memory, for real this time"),
}
)
.into_boxed_str()
}
fn extract_value(line: &str) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let mut parts = line.split_whitespace();
parts.next();
Ok(parts
.next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?
.parse()?)
}
} }
#[async_trait] fn extract_value(line: &str) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
impl TileModule for Memory { let mut parts = line.split_whitespace();
async fn run( parts.next();
&mut self, Ok(parts
sender: &mut BlockSender, .next()
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?
let mut timer = interval(Duration::from_secs(5)); .parse()?)
let mut raw = [0u8; 256];
loop {
timer.tick().await;
File::open("/proc/meminfo")
.await?
.read_exact(&mut raw)
.await?;
let string_data = str::from_utf8(&raw)?;
let mut lines = string_data.split('\n');
let mem_total = Memory::prettify_kib(Memory::extract_value(
lines
.next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?,
)?);
lines.next();
let mem_avail = Memory::prettify_kib(Memory::extract_value(
lines
.next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?,
)?);
let full_text = format!("{} avail / {}", mem_avail, mem_total).into_boxed_str();
let short_text = format!("{} / {}", mem_avail, mem_total).into_boxed_str();
let block = Block {
full_text,
short_text: Some(short_text),
name: "memory".into(),
..Default::default()
};
sender.send(block).await?;
}
}
} }
#[allow(unused)] pub fn memory_stream<T>(
fn memory_stream<T>(
clock: T, clock: T,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>> ) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
where where
@ -110,13 +55,13 @@ where
.await?; .await?;
let string_data = str::from_utf8(&raw)?; let string_data = str::from_utf8(&raw)?;
let mut lines = string_data.split('\n'); let mut lines = string_data.split('\n');
let mem_total = Memory::prettify_kib(Memory::extract_value( let mem_total = prettify_kib(extract_value(
lines lines
.next() .next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?,
)?); )?);
lines.next(); lines.next();
let mem_avail = Memory::prettify_kib(Memory::extract_value( let mem_avail = prettify_kib(extract_value(
lines lines
.next() .next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?, .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?,

View file

@ -2,7 +2,7 @@ pub mod hostname;
pub mod load; pub mod load;
pub mod memory; pub mod memory;
pub mod time; pub mod time;
pub use hostname::Hostname; pub use hostname::hostname_stream;
pub use load::Load; pub use load::load_stream;
pub use memory::Memory; pub use memory::memory_stream;
pub use time::Time; pub use time::time_stream;

View file

@ -1,86 +1,28 @@
use crate::config::TimeConfig; use crate::config::TimeConfig;
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::Block;
use async_trait::async_trait;
use chrono::prelude::*; use chrono::prelude::*;
use chrono::DateTime; use chrono::DateTime;
use futures::future::Future; use futures::future::Future;
use futures::stream::Stream; use futures::stream::Stream;
use futures_util::ready; use futures_util::ready;
use pin_project::pin_project; use pin_project::pin_project;
use std::error::Error;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{delay_for, delay_until, Delay, Instant}; use tokio::time::{delay_for, delay_until, Delay, Instant};
#[derive(Debug)]
pub struct Time {
format: Box<str>,
short_format: Box<str>,
}
impl Time {
pub fn new() -> Time {
Default::default()
}
pub fn from_config(config: &TimeConfig) -> Time {
Time {
format: config.format.clone(),
short_format: config.short_format.clone(),
}
}
fn send_time(&mut self, time: DateTime<Local>) -> Block {
Block {
full_text: time.format(&self.format).to_string().into(),
short_text: Some(time.format(&self.short_format).to_string().into()),
name: "time".into(),
..Default::default()
}
}
}
impl Default for Time {
fn default() -> Self {
Time {
format: "%Y-%m-%d %H:%M:%S".into(),
short_format: "%H:%M:%S".into(),
}
}
}
#[async_trait]
impl TileModule for Time {
async fn run(
&mut self,
sender: &mut BlockSender,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut time = Local::now();
loop {
sender.send(self.send_time(time)).await?;
time = Local::now();
let millis_part = time.naive_local().timestamp_subsec_millis() as u64;
let delay_ms = 1000u64 - millis_part % 1000; // Don't crash if we hit a leap second
delay_for(Duration::from_millis(delay_ms)).await;
}
}
}
pub fn time_stream( pub fn time_stream(
format: Box<str>, config: TimeConfig,
short_format: Box<str>,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>> { ) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>> {
TimeStream { TimeStream {
format, config,
short_format,
delay: delay_until(Instant::now()), delay: delay_until(Instant::now()),
} }
} }
#[pin_project] #[pin_project]
struct TimeStream { struct TimeStream {
format: Box<str>, config: TimeConfig,
short_format: Box<str>,
#[pin] #[pin]
delay: Delay, delay: Delay,
} }
@ -88,8 +30,8 @@ struct TimeStream {
impl TimeStream { impl TimeStream {
fn send_time(&self, time: DateTime<Local>) -> Block { fn send_time(&self, time: DateTime<Local>) -> Block {
Block { Block {
full_text: time.format(&self.format).to_string().into(), full_text: time.format(&self.config.format).to_string().into(),
short_text: Some(time.format(&self.short_format).to_string().into()), short_text: Some(time.format(&self.config.short_format).to_string().into()),
name: "time".into(), name: "time".into(),
..Default::default() ..Default::default()
} }
@ -104,7 +46,7 @@ impl TimeStream {
} }
impl Stream for TimeStream { impl Stream for TimeStream {
type Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>; type Item = Result<Block, Box<dyn Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let project = Pin::as_mut(&mut self).project(); let project = Pin::as_mut(&mut self).project();
ready!(Future::poll(project.delay, cx)); ready!(Future::poll(project.delay, cx));