Add stream implementations of tiles

This commit is contained in:
Skye Jensen 2020-06-04 19:47:16 -04:00
parent 7db23d0a70
commit 68f2d4e3f1
9 changed files with 189 additions and 7 deletions

3
Cargo.lock generated
View file

@ -581,6 +581,9 @@ dependencies = [
"chrono", "chrono",
"dbus", "dbus",
"dbus-tokio", "dbus-tokio",
"futures",
"futures-util",
"pin-project",
"serde", "serde",
"serde_json", "serde_json",
"smart-default", "smart-default",

View file

@ -10,6 +10,9 @@ async-trait = "0.1"
chrono = "0.4" chrono = "0.4"
dbus = "0.8" dbus = "0.8"
dbus-tokio = "0.5" dbus-tokio = "0.5"
futures = "0.3"
futures-util = "0.3"
pin-project = "0.4"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
smart-default = "0.6" smart-default = "0.6"

View file

@ -4,7 +4,10 @@ pub mod tile;
pub mod tiles; pub mod tiles;
use dbus_tokio::connection::new_session_sync; use dbus_tokio::connection::new_session_sync;
use tokio::sync::mpsc::channel; use futures::channel::mpsc::{channel, Sender};
use futures::{Stream, StreamExt};
use std::fmt::Debug;
use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
#[tokio::main] #[tokio::main]
@ -45,6 +48,34 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for tile in tiles.into_iter() { for tile in tiles.into_iter() {
tile.spawn(); tile.spawn();
} }
// let format = "%Y-%m-%d %H:%M:%S".into();
// let short_format = "%H:%M:%S".into();
// let stream = tiles::time::time_stream(format, short_format);
// spawn_stream(4, stream, sender.clone());
drop(sender);
match output::launch(num_tiles, receiver, config.default).await? {} match output::launch(num_tiles, receiver, config.default).await? {}
} }
#[allow(unused)]
fn spawn_stream<S, E>(index: usize, stream: S, sender: Sender<tile::TileData>)
where
S: Stream<Item = Result<tile::Block, E>> + Send + 'static,
E: Debug,
{
tokio::spawn(async move {
let instance: Arc<str> = Uuid::new_v4().to_string().into();
let stream = stream.map(|block| {
Ok(tile::TileData {
block: tile::Block {
instance: instance.clone(),
..block.unwrap()
},
sender_id: index,
})
});
let future = stream.forward(sender);
future.await
});
}

View file

@ -1,8 +1,9 @@
use crate::config::DefaultSection; use crate::config::DefaultSection;
use crate::tile::TileData; use crate::tile::TileData;
use futures::channel::mpsc::Receiver;
use futures::StreamExt;
use std::convert::Infallible; use std::convert::Infallible;
use tokio::io::{self, AsyncWriteExt}; use tokio::io::{self, AsyncWriteExt};
use tokio::sync::mpsc::Receiver;
pub async fn launch( pub async fn launch(
num_tiles: usize, num_tiles: usize,
@ -15,7 +16,7 @@ pub async fn launch(
let mut blocks = Vec::new(); let mut blocks = Vec::new();
blocks.resize_with(num_tiles, Default::default); blocks.resize_with(num_tiles, Default::default);
loop { loop {
let message = receiver.recv().await.unwrap(); let message = receiver.next().await.unwrap();
if message.sender_id < num_tiles { if message.sender_id < num_tiles {
blocks[message.sender_id] = Some(message.block); blocks[message.sender_id] = Some(message.block);
} else { } else {

View file

@ -1,9 +1,11 @@
use async_trait::async_trait; use async_trait::async_trait;
use futures::channel::mpsc::{SendError, Sender};
use futures::SinkExt;
use serde::{ser::Serializer, Serialize}; use serde::{ser::Serializer, Serialize};
use smart_default::SmartDefault; use smart_default::SmartDefault;
use std::fmt::Debug; use std::fmt::Debug;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::{error::SendError, Sender}; //use tokio::sync::mpsc::{error::SendError, Sender};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
#[derive(Copy, Clone, Debug, Serialize)] #[derive(Copy, Clone, Debug, Serialize)]
@ -65,7 +67,7 @@ pub struct Block {
pub align: Option<Alignment>, pub align: Option<Alignment>,
pub name: Box<str>, pub name: Box<str>,
#[serde(serialize_with = "arc_default")] #[serde(serialize_with = "arc_default")]
#[default = r#""".into()"#] #[default = ""]
pub instance: Arc<str>, pub instance: Arc<str>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub urgent: Option<bool>, pub urgent: Option<bool>,
@ -99,7 +101,7 @@ pub struct BlockSender {
} }
impl BlockSender { impl BlockSender {
pub async fn send(&mut self, mut block: Block) -> Result<(), SendError<TileData>> { pub async fn send(&mut self, mut block: Block) -> Result<(), SendError> {
block.instance = self.instance.clone(); block.instance = self.instance.clone();
let data = TileData { let data = TileData {
block, block,

View file

@ -1,5 +1,7 @@
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::{Block, BlockSender, TileModule};
use async_trait::async_trait; use async_trait::async_trait;
use futures::stream;
use futures::Stream;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
@ -33,3 +35,21 @@ impl TileModule for Hostname {
Ok(()) Ok(())
} }
} }
#[allow(unused)]
fn hostname_stream() -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
{
stream::once(async {
let mut raw = String::new();
File::open("/proc/sys/kernel/hostname")
.await?
.read_to_string(&mut raw)
.await?;
let block = Block {
full_text: raw.trim_end_matches('\n').into(),
name: "hostname".into(),
..Default::default()
};
Ok(block)
})
}

View file

@ -1,8 +1,10 @@
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::{Block, BlockSender, TileModule};
use async_trait::async_trait; use async_trait::async_trait;
use futures::stream::StreamExt;
use std::time::Duration; use std::time::Duration;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::stream::Stream;
use tokio::time::interval; use tokio::time::interval;
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -38,3 +40,25 @@ impl TileModule for Load {
} }
} }
} }
#[allow(unused)]
fn load_stream<T>(
clock: T,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
where
T: Stream,
{
clock.then(|_| async {
let mut raw = String::new();
File::open("/proc/loadavg")
.await?
.read_to_string(&mut raw)
.await?;
let (load, _rest) = raw.split_at(raw.find(' ').unwrap_or(0));
Ok(Block {
full_text: load.into(),
name: "load".into(),
..Default::default()
})
})
}

View file

@ -1,5 +1,6 @@
use crate::tile::{Block, BlockSender, TileModule}; use crate::tile::{Block, BlockSender, TileModule};
use async_trait::async_trait; use async_trait::async_trait;
use futures::{Stream, StreamExt};
use std::io; use std::io;
use std::str; use std::str;
use std::time::Duration; use std::time::Duration;
@ -93,3 +94,42 @@ impl TileModule for Memory {
} }
} }
} }
#[allow(unused)]
fn memory_stream<T>(
clock: T,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
where
T: Stream,
{
clock.then(|_| async {
let mut raw = [0u8; 256];
File::open("/proc/meminfo")
.await?
.read_exact(&mut raw)
.await?;
let string_data = str::from_utf8(&raw)?;
let mut lines = string_data.split('\n');
let mem_total = Memory::prettify_kib(Memory::extract_value(
lines
.next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?,
)?);
lines.next();
let mem_avail = Memory::prettify_kib(Memory::extract_value(
lines
.next()
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidData))?,
)?);
let full_text = format!("{} avail / {}", mem_avail, mem_total).into_boxed_str();
let short_text = format!("{} / {}", mem_avail, mem_total).into_boxed_str();
Ok(Block {
full_text,
short_text: Some(short_text),
name: "memory".into(),
..Default::default()
})
})
}

View file

@ -3,8 +3,14 @@ use crate::tile::{Block, BlockSender, TileModule};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::prelude::*; use chrono::prelude::*;
use chrono::DateTime; use chrono::DateTime;
use futures::future::Future;
use futures::stream::Stream;
use futures_util::ready;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio::time::delay_for; use tokio::time::{delay_for, delay_until, Delay, Instant};
#[derive(Debug)] #[derive(Debug)]
pub struct Time { pub struct Time {
@ -59,3 +65,55 @@ impl TileModule for Time {
} }
} }
} }
pub fn time_stream(
format: Box<str>,
short_format: Box<str>,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>> {
TimeStream {
format,
short_format,
delay: delay_until(Instant::now()),
}
}
#[pin_project]
struct TimeStream {
format: Box<str>,
short_format: Box<str>,
#[pin]
delay: Delay,
}
impl TimeStream {
fn send_time(&self, time: DateTime<Local>) -> Block {
Block {
full_text: time.format(&self.format).to_string().into(),
short_text: Some(time.format(&self.short_format).to_string().into()),
name: "time".into(),
..Default::default()
}
}
fn wait_for_next_second(now: DateTime<Local>) -> Delay {
let next = now.trunc_subsecs(0) + chrono::Duration::seconds(1);
let difference = next - now;
delay_for(difference.to_std().unwrap())
}
}
impl Stream for TimeStream {
type Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let project = Pin::as_mut(&mut self).project();
ready!(Future::poll(project.delay, cx));
let now = Local::now();
Pin::as_mut(&mut self)
.project()
.delay
.set(TimeStream::wait_for_next_second(now));
Poll::Ready(Some(Ok(self.send_time(now))))
}
}