use std::fs::File; use std::path::{Path, PathBuf}; use std::time::SystemTime; use crate::proto::Entry; use crate::store::Store; pub type Error = std::io::Error; pub type Result = std::result::Result; #[cfg(not(test))] const MAX_ENTRIES: usize = 4096; #[cfg(test)] const MAX_ENTRIES: usize = 2; // manages a directory of append-only logs // the most recent log is kept open for easy access, the rest are opened as needed // to access old data // a new log is started once the current log is larger than a given size pub struct Datalog { cur_store: Store, path: PathBuf, prefix: String, } fn walk_dir>(path: P, prefix: &str) -> Result> { let mut dir_path = path.as_ref().to_path_buf(); dir_path.push("."); //println!("reading dir {:?}", &dir_path); let contents = std::fs::read_dir(dir_path)?; let mut entries: Vec = contents .into_iter() .filter_map(|dir_entry| dir_entry.ok().map(|de| de.path())) .filter(|path| { path.as_path() .file_name() .and_then(|filen| filen.to_str()) .map(|filen| filen.starts_with(prefix)) .unwrap_or(false) }) .collect(); entries.sort_by_key(|path| { path.file_stem() .and_then(|stem| stem.to_str()) .and_then(|stem| stem[prefix.len()..].parse::().ok()) .unwrap_or(0) }); Ok(entries) } fn in_range(start: u64, end: Option, start2: Option, end2: Option) -> bool { match (start2, end2) { (Some(s2), Some(e2)) => { if e2 < start { return false; } if let Some(e) = end { if e < s2 { return false; } } return true; } _ => false, } } fn new_path>(dir: P, file_prefix: &str) -> PathBuf { let timestamp = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); let mut path = dir.as_ref().to_path_buf(); path.push(&format!("{file_prefix}{timestamp}.shrooms")); path } impl Datalog { pub fn new>(dirpath: P, file_prefix: &str) -> Result { let files = walk_dir(&dirpath, file_prefix)?; let cur_store_path = files .last() .map(|path| path.clone()) .unwrap_or_else(|| new_path(&dirpath, file_prefix)); println!("opening most recent store {:?}", &cur_store_path); let cur_store = Store::open(cur_store_path)?; Ok(Datalog { cur_store, path: dirpath.as_ref().to_path_buf(), prefix: file_prefix.to_string(), }) } pub fn lookup_range(&self, start: u64, end: Option) -> Result> { let only_most_recent = self.cur_store.start().map(|ts| ts < start).unwrap_or(false); if only_most_recent { let result = self.cur_store.lookup_range(start, end); return Ok(result); } let mut relevant_stores = Vec::new(); //println!("walking dir {:?}", &self.path); let mut entries = walk_dir(&self.path, &self.prefix)?; //println!("found logs {:?}", &entries); entries.reverse(); for e in entries { let store = Store::open_read_only(e).ok(); if let Some(s) = store { if in_range(start, end, s.start(), s.end()) { relevant_stores.push(s); } } } // reverse to return it to chronological order relevant_stores.reverse(); let mut ret = Vec::new(); for s in relevant_stores { ret.extend(s.lookup_range(start, end)); } Ok(ret) } pub fn add(&mut self, e: &Entry) -> Result<()> { self.cur_store.append(e)?; if self.cur_store.num_entries() >= MAX_ENTRIES { self.cur_store = Store::open(new_path(&self.path, &self.prefix))?; } Ok(()) } } #[cfg(test)] mod tests { use super::{Datalog, Entry}; #[test] fn test_write1() { let temp_dir = tempfile::TempDir::new().unwrap(); let mut datalog = Datalog::new(temp_dir.path(), "test").unwrap(); let entry = Entry { timestamp: Some(7), humidity: vec![0.95], temp: Some(32.2), fan: Some(false), humidifer: vec![false], }; datalog.add(&entry).unwrap(); let entries = datalog.lookup_range(0, None).unwrap(); assert_eq!(entries, vec![entry.clone()]); } #[test] fn overflow() { let temp_dir = tempfile::TempDir::new().unwrap(); let mut datalog = Datalog::new(temp_dir.path(), "test").unwrap(); let entry = Entry { timestamp: Some(7), humidity: vec![0.95], temp: Some(32.2), fan: Some(false), humidifer: vec![false], }; let entry2 = Entry { timestamp: Some(16), humidity: vec![0.92], temp: Some(31.2), fan: Some(true), humidifer: vec![false], }; let entry3 = Entry { timestamp: Some(17), humidity: vec![0.95], temp: Some(31.4), fan: Some(false), humidifer: vec![true], }; let entry4 = Entry { timestamp: Some(31), humidity: vec![0.95], temp: Some(31.4), fan: Some(false), humidifer: vec![true], }; let entry5 = Entry { timestamp: Some(47), humidity: vec![0.95], temp: Some(31.4), fan: Some(false), humidifer: vec![true], }; let entry6 = Entry { timestamp: Some(97), humidity: vec![0.95], temp: Some(31.4), fan: Some(false), humidifer: vec![true], }; datalog.add(&entry).unwrap(); datalog.add(&entry2).unwrap(); datalog.add(&entry3).unwrap(); datalog.add(&entry4).unwrap(); datalog.add(&entry5).unwrap(); datalog.add(&entry6).unwrap(); let entries = datalog.lookup_range(4, Some(8)).unwrap(); assert_eq!(entries, vec![entry.clone()]); let entries = datalog.lookup_range(4, Some(16)).unwrap(); assert_eq!(entries, vec![entry.clone(), entry2.clone()]); let entries = datalog.lookup_range(8, Some(16)).unwrap(); assert_eq!(entries, vec![entry2.clone()]); let entries = datalog.lookup_range(4, Some(17)).unwrap(); assert_eq!(entries, vec![entry.clone(), entry2.clone(), entry3.clone()]); let entries = datalog.lookup_range(8, None).unwrap(); assert_eq!( entries, vec![ entry2.clone(), entry3.clone(), entry4.clone(), entry5.clone(), entry6.clone() ] ); } }