Move throttling outside of tiles

This commit is contained in:
Skye Jensen 2020-06-13 22:22:30 -04:00
parent dc78dfe4d6
commit e4e8fc11cc
4 changed files with 9 additions and 18 deletions

View file

@ -89,8 +89,8 @@ pub fn process_tile(
) -> BoxStream<'static, Result<Block, Box<dyn std::error::Error + Send + Sync>>> { ) -> BoxStream<'static, Result<Block, Box<dyn std::error::Error + Send + Sync>>> {
let five_secs = time::Duration::from_secs(5); let five_secs = time::Duration::from_secs(5);
match tile { match tile {
TileConfig::Load => Box::pin(tiles::load_stream(time::interval(five_secs))), TileConfig::Load => Box::pin(time::throttle(five_secs, tiles::load_stream())),
TileConfig::Memory => Box::pin(tiles::memory_stream(time::interval(five_secs))), TileConfig::Memory => Box::pin(time::throttle(five_secs, tiles::memory_stream())),
TileConfig::Hostname => Box::pin(tiles::hostname_stream()), TileConfig::Hostname => Box::pin(tiles::hostname_stream()),
TileConfig::Time(c) => Box::pin(tiles::time_stream(c.clone())), TileConfig::Time(c) => Box::pin(tiles::time_stream(c.clone())),
} }

View file

@ -4,13 +4,9 @@ use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
use tokio::stream::Stream; use tokio::stream::Stream;
pub fn load_stream<T>( pub fn load_stream() -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
clock: T,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
where
T: Stream,
{ {
clock.then(|_| async { futures::stream::repeat(()).then(|()| async {
let mut raw = String::new(); let mut raw = String::new();
File::open("/proc/loadavg") File::open("/proc/loadavg")
.await? .await?

View file

@ -1,7 +1,6 @@
use crate::tile::Block; use crate::tile::Block;
use futures::{Stream, StreamExt}; use futures::{stream, Stream, StreamExt};
use std::io; use std::{io, str, u64};
use std::str;
use tokio::fs::File; use tokio::fs::File;
use tokio::prelude::*; use tokio::prelude::*;
@ -41,13 +40,9 @@ fn extract_value(line: &str) -> Result<u64, Box<dyn std::error::Error + Send + S
.parse()?) .parse()?)
} }
pub fn memory_stream<T>( pub fn memory_stream() -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
clock: T,
) -> impl Stream<Item = Result<Block, Box<dyn std::error::Error + Send + Sync>>>
where
T: Stream,
{ {
clock.then(|_| async { stream::repeat(()).then(|_| async {
let mut raw = [0u8; 256]; let mut raw = [0u8; 256];
File::open("/proc/meminfo") File::open("/proc/meminfo")
.await? .await?

View file

@ -49,7 +49,7 @@ impl Stream for TimeStream {
type Item = Result<Block, Box<dyn Error + Send + Sync>>; type Item = Result<Block, Box<dyn Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let project = Pin::as_mut(&mut self).project(); let project = Pin::as_mut(&mut self).project();
let () = ready!(Future::poll(project.delay, cx)); ready!(Future::poll(project.delay, cx));
let now = Local::now(); let now = Local::now();
Pin::as_mut(&mut self) Pin::as_mut(&mut self)