From dcfd5ebc79d911ee836a3c49475d85b0c6a4bc36 Mon Sep 17 00:00:00 2001 From: Kelvin Ly Date: Sun, 16 Feb 2025 09:07:45 -0500 Subject: [PATCH] Implement datalogger --- shroom-server/src/datalog.rs | 235 +++++++++++++++++++++++++++++++++++ shroom-server/src/main.rs | 1 + shroom-server/src/store.rs | 22 +++- 3 files changed, 255 insertions(+), 3 deletions(-) create mode 100644 shroom-server/src/datalog.rs diff --git a/shroom-server/src/datalog.rs b/shroom-server/src/datalog.rs new file mode 100644 index 0000000..301f56a --- /dev/null +++ b/shroom-server/src/datalog.rs @@ -0,0 +1,235 @@ +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +use crate::proto::Entry; +use crate::store::Store; + +pub type Error = std::io::Error; +pub type Result = std::result::Result; + +#[cfg(not(test))] +const MAX_ENTRIES: usize = 4096; +#[cfg(test)] +const MAX_ENTRIES: usize = 2; + +// manages a directory of append-only logs +// the most recent log is kept open for easy access, the rest are opened as needed +// to access old data +// a new log is started once the current log is larger than a given size +pub struct Datalog { + cur_store: Store, + path: PathBuf, + prefix: String, +} + +fn walk_dir>(path: P, prefix: &str) -> Result> { + let mut dir_path = path.as_ref().to_path_buf(); + dir_path.push("."); + //println!("reading dir {:?}", &dir_path); + let contents = std::fs::read_dir(dir_path)?; + let mut entries: Vec = contents + .into_iter() + .filter_map(|dir_entry| dir_entry.ok().map(|de| de.path())) + .filter(|path| { + path.as_path() + .file_name() + .and_then(|filen| filen.to_str()) + .map(|filen| filen.starts_with(prefix)) + .unwrap_or(false) + }) + .collect(); + entries.sort_by_key(|path| { + path.file_stem() + .and_then(|stem| stem.to_str()) + .and_then(|stem| stem[prefix.len()..].parse::().ok()) + .unwrap_or(0) + }); + + Ok(entries) +} + +fn in_range(start: u64, end: Option, start2: Option, end2: Option) -> bool { + match (start2, end2) { + (Some(s2), Some(e2)) => { + if e2 < start { + return false; + } + if let Some(e) = end { + if e < s2 { + return false; + } + } + return true; + } + _ => false, + } +} + +fn new_path>(dir: P, file_prefix: &str) -> PathBuf { + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let mut path = dir.as_ref().to_path_buf(); + path.push(&format!("{file_prefix}{timestamp}.shrooms")); + + path +} + +impl Datalog { + pub fn new>(dirpath: P, file_prefix: &str) -> Result { + let files = walk_dir(&dirpath, file_prefix)?; + let cur_store_path = files + .last() + .map(|path| path.clone()) + .unwrap_or_else(|| new_path(&dirpath, file_prefix)); + + println!("opening most recent store {:?}", &cur_store_path); + let cur_store = Store::open(cur_store_path)?; + + Ok(Datalog { + cur_store, + path: dirpath.as_ref().to_path_buf(), + prefix: file_prefix.to_string(), + }) + } + + pub fn lookup_range(&self, start: u64, end: Option) -> Result> { + let only_most_recent = self.cur_store.start().map(|ts| ts < start).unwrap_or(false); + if only_most_recent { + let result = self.cur_store.lookup_range(start, end); + return Ok(result); + } + + let mut relevant_stores = Vec::new(); + //println!("walking dir {:?}", &self.path); + let mut entries = walk_dir(&self.path, &self.prefix)?; + //println!("found logs {:?}", &entries); + entries.reverse(); + for e in entries { + let store = Store::open_read_only(e).ok(); + if let Some(s) = store { + if in_range(start, end, s.start(), s.end()) { + relevant_stores.push(s); + } + } + } + // reverse to return it to chronological order + relevant_stores.reverse(); + let mut ret = Vec::new(); + for s in relevant_stores { + ret.extend(s.lookup_range(start, end)); + } + Ok(ret) + } + + pub fn add(&mut self, e: &Entry) -> Result<()> { + self.cur_store.append(e)?; + + if self.cur_store.num_entries() >= MAX_ENTRIES { + self.cur_store = Store::open(new_path(&self.path, &self.prefix))?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::{Datalog, Entry}; + + #[test] + fn test_write1() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let mut datalog = Datalog::new(temp_dir.path(), "test").unwrap(); + + let entry = Entry { + timestamp: Some(7), + humidity: vec![0.95], + temp: Some(32.2), + fan: Some(false), + humidifer: vec![false], + }; + + datalog.add(&entry).unwrap(); + + let entries = datalog.lookup_range(0, None).unwrap(); + assert_eq!(entries, vec![entry.clone()]); + } + #[test] + fn overflow() { + let temp_dir = tempfile::TempDir::new().unwrap(); + let mut datalog = Datalog::new(temp_dir.path(), "test").unwrap(); + + let entry = Entry { + timestamp: Some(7), + humidity: vec![0.95], + temp: Some(32.2), + fan: Some(false), + humidifer: vec![false], + }; + let entry2 = Entry { + timestamp: Some(16), + humidity: vec![0.92], + temp: Some(31.2), + fan: Some(true), + humidifer: vec![false], + }; + let entry3 = Entry { + timestamp: Some(17), + humidity: vec![0.95], + temp: Some(31.4), + fan: Some(false), + humidifer: vec![true], + }; + let entry4 = Entry { + timestamp: Some(31), + humidity: vec![0.95], + temp: Some(31.4), + fan: Some(false), + humidifer: vec![true], + }; + let entry5 = Entry { + timestamp: Some(47), + humidity: vec![0.95], + temp: Some(31.4), + fan: Some(false), + humidifer: vec![true], + }; + let entry6 = Entry { + timestamp: Some(97), + humidity: vec![0.95], + temp: Some(31.4), + fan: Some(false), + humidifer: vec![true], + }; + + datalog.add(&entry).unwrap(); + datalog.add(&entry2).unwrap(); + datalog.add(&entry3).unwrap(); + datalog.add(&entry4).unwrap(); + datalog.add(&entry5).unwrap(); + datalog.add(&entry6).unwrap(); + + let entries = datalog.lookup_range(4, Some(8)).unwrap(); + assert_eq!(entries, vec![entry.clone()]); + let entries = datalog.lookup_range(4, Some(16)).unwrap(); + assert_eq!(entries, vec![entry.clone(), entry2.clone()]); + let entries = datalog.lookup_range(8, Some(16)).unwrap(); + assert_eq!(entries, vec![entry2.clone()]); + let entries = datalog.lookup_range(4, Some(17)).unwrap(); + assert_eq!(entries, vec![entry.clone(), entry2.clone(), entry3.clone()]); + let entries = datalog.lookup_range(8, None).unwrap(); + assert_eq!( + entries, + vec![ + entry2.clone(), + entry3.clone(), + entry4.clone(), + entry5.clone(), + entry6.clone() + ] + ); + } +} diff --git a/shroom-server/src/main.rs b/shroom-server/src/main.rs index 4699e44..c725b75 100644 --- a/shroom-server/src/main.rs +++ b/shroom-server/src/main.rs @@ -1,6 +1,7 @@ mod proto { include!(concat!(env!("OUT_DIR"), "/internalstore.rs")); } +mod datalog; mod store; fn main() { diff --git a/shroom-server/src/store.rs b/shroom-server/src/store.rs index 30b1239..6994f4d 100644 --- a/shroom-server/src/store.rs +++ b/shroom-server/src/store.rs @@ -6,6 +6,9 @@ use prost::Message; pub use crate::proto::Entry; +pub type Error = std::io::Error; +pub type Result = std::result::Result; + const X25: crc::Crc = crc::Crc::::new(&crc::CRC_16_IBM_SDLC); // data is stored in append-only logs @@ -14,7 +17,6 @@ const X25: crc::Crc = crc::Crc::::new(&crc::CRC_16_IBM_SDLC); // actual entry encoded in protocol buffer // upon opening a file, all the entries within it are // read, any corrupted contents at the end truncated/overwritten - pub struct Store { contents: T, entries: Vec, @@ -22,9 +24,23 @@ pub struct Store { end_ts: Option, } -pub type Error = std::io::Error; +impl Store { + pub fn num_entries(&self) -> usize { + self.entries.len() + } -pub type Result = std::result::Result; + pub fn data(&self) -> Vec { + self.entries.clone() + } + + pub fn start(&self) -> Option { + self.start_ts + } + + pub fn end(&self) -> Option { + self.end_ts + } +} impl Store>> { pub fn open_read_only>(p: P) -> Result>>> {