From e56c4835f1fdc6ec0386760f8415bb8aa0411493 Mon Sep 17 00:00:00 2001 From: Kelvin Ly Date: Mon, 3 Jul 2023 05:12:19 -0400 Subject: [PATCH] Add caching to reduce load on sqlite --- shroom_internals/cache.go | 81 ++++++++++++++++++++++++++++++++++ shroom_internals/cache_test.go | 55 +++++++++++++++++++++++ shroom_internals/sql_wrap.go | 9 ++++ shroom_internals/tcp_server.go | 16 +++++++ shroom_server.go | 26 ++++++++--- 5 files changed, 180 insertions(+), 7 deletions(-) create mode 100644 shroom_internals/cache.go create mode 100644 shroom_internals/cache_test.go diff --git a/shroom_internals/cache.go b/shroom_internals/cache.go new file mode 100644 index 0000000..f1c43f4 --- /dev/null +++ b/shroom_internals/cache.go @@ -0,0 +1,81 @@ +package shroom_internals + +import ( + "sync" +) + +const MAX_CACHE_POINTS = 1000 + +type DataCache struct { + sync.RWMutex + data []Datapoint + start int +} + +func (dc *DataCache) Add(data *Datapoint) { + dc.Lock() + defer dc.Unlock() + + if len(dc.data) < MAX_CACHE_POINTS { + dc.data = append(dc.data, *data) + } else { + dc.data[dc.start] = *data + dc.start += 1 + if dc.start >= len(dc.data) { + dc.start = 0 + } + } +} + +func (dc *DataCache) binarySearch(s int, e int, target uint64) int { + for s < e { + mid := (s + e) / 2 + if dc.data[mid].Time < target { + s = mid + 1 + } else { + e = mid + } + } + return s +} + +func (dc *DataCache) ReadSince(start_time uint64) []Datapoint { + dc.RLock() + defer dc.RUnlock() + + if len(dc.data) == 0 { + return nil + } + if dc.data[dc.start].Time <= start_time { + // binary search for the point right at/after + // start_time + + if len(dc.data) >= MAX_CACHE_POINTS { + // if the data is full length we've been wrapping + // around + if dc.data[0].Time <= start_time { + // it's between [0, start) + idx := dc.binarySearch(0, dc.start, start_time) + ret := []Datapoint{} + ret = append(ret, dc.data[idx:dc.start]...) + return ret + } else { + // it's between [start, len(dc.data)) + // and we can always include [0, start) afterwards + idx := dc.binarySearch(dc.start, len(dc.data), start_time) + ret := []Datapoint{} + ret = append(ret, dc.data[idx:len(dc.data)]...) + ret = append(ret, dc.data[0:dc.start]...) + return ret + } + } else { + // it's between [0, len(dc.data)) + idx := dc.binarySearch(0, len(dc.data), start_time) + ret := []Datapoint{} + ret = append(ret, dc.data[idx:len(dc.data)]...) + return ret + } + } else { + return nil + } +} diff --git a/shroom_internals/cache_test.go b/shroom_internals/cache_test.go new file mode 100644 index 0000000..b8e51fc --- /dev/null +++ b/shroom_internals/cache_test.go @@ -0,0 +1,55 @@ +package shroom_internals + +import ( + "testing" +) + +func TestCache(t *testing.T) { + cache := DataCache{} + for i := 0; i < 2*MAX_CACHE_POINTS; i++ { + cache.Add(&Datapoint{Time: uint64(i)}) + if len(cache.data) > MAX_CACHE_POINTS { + t.Errorf("cache expanded past limit at %d, len = %d \n", i, len(cache.data)) + } + if (i % 100) == 0 { + for j := 0; j < 3*MAX_CACHE_POINTS; j += 25 { + result := cache.ReadSince(uint64(j)) + if j > (i - MAX_CACHE_POINTS) { + if result == nil { + t.Errorf("read was nil instead of an array at %d, %d\n", i, j) + } else { + expected := i - j + 1 + if expected < 0 { + expected = 0 + } + if len(result) != expected { + t.Errorf("invalid read len %d != %d\n", len(result), expected) + } + } + } else { + if result != nil { + t.Errorf("read should have been nil at %d, %d %v\n", i, j, result) + } + } + } + } + } +} + +func BenchmarkCacheAdd(b *testing.B) { + cache := DataCache{} + dp := Datapoint{} + for n := 0; n < b.N; n++ { + cache.Add(&dp) + } +} + +func BenchmarkCacheRead(b *testing.B) { + cache := DataCache{} + for i := 0; i < int(1.3*MAX_CACHE_POINTS); i++ { + cache.Add(&Datapoint{Time: uint64(i)}) + } + for n := 0; n < b.N; n++ { + cache.ReadSince(MAX_CACHE_POINTS) + } +} diff --git a/shroom_internals/sql_wrap.go b/shroom_internals/sql_wrap.go index 446bad1..4c398a8 100644 --- a/shroom_internals/sql_wrap.go +++ b/shroom_internals/sql_wrap.go @@ -6,6 +6,15 @@ import ( "fmt" ) +func DatapointsToJson(ds []Datapoint) ([]byte, error) { + msg, err := json.Marshal(ds) + if err != nil { + return nil, fmt.Errorf("json marshal error: %w", err) + } + return msg, nil + +} + func GetRows(db *sql.DB, t int64) ([]byte, error) { results, err := QueryHistory(db, t) if err != nil { diff --git a/shroom_internals/tcp_server.go b/shroom_internals/tcp_server.go index 40cff1a..5840472 100644 --- a/shroom_internals/tcp_server.go +++ b/shroom_internals/tcp_server.go @@ -33,6 +33,15 @@ type DataJson struct { HumidifierVolts *float32 `json:"hv"` } +func (dj *DataJson) ToDatapoint() *Datapoint { + return &Datapoint{ + Time: *dj.Time, + Temperature: *dj.Temperature, + Humidity: *dj.Humidity, + HumidifierVolts: *dj.HumidifierVolts, + } +} + type ShroomPacket struct { Data *DataJson `json:"data"` Status *StatusJson `json:"status"` @@ -51,6 +60,8 @@ type ShroomState struct { ParamsWait chan struct{} Commands chan []byte + + Cache DataCache } func NewShroomState() ShroomState { @@ -60,6 +71,7 @@ func NewShroomState() ShroomState { StatusWait: make(chan struct{}), ParamsWait: make(chan struct{}), Commands: make(chan []byte), + Cache: DataCache{}, } } @@ -143,6 +155,9 @@ func QueryParams(state *ShroomState) ([]byte, error) { } +// used to cache data points to avoid accessing +// sqlite for most queries + func parseMsg(line []byte, db *sql.DB, state *ShroomState) { packet := ShroomPacket{} err := json.Unmarshal(line, &packet) @@ -156,6 +171,7 @@ func parseMsg(line []byte, db *sql.DB, state *ShroomState) { if packet.Data != nil { if packet.Data.Time != nil && packet.Data.Temperature != nil && packet.Data.Humidity != nil && packet.Data.HumidifierVolts != nil { err = InsertRow(db, packet.Data) + state.Cache.Add(packet.Data.ToDatapoint()) if err != nil { log.Println("unable to write to database: ", err) } diff --git a/shroom_server.go b/shroom_server.go index e75de22..5d3146c 100644 --- a/shroom_server.go +++ b/shroom_server.go @@ -40,7 +40,7 @@ type adminMsg struct { // returns a function that multiplies the number at the very last segment of the url // and returns the data that was collected in the last n*multiplier milliseconds -func dumpData(db *sql.DB, multiplier int64) func(http.ResponseWriter, *http.Request) { +func dumpData(db *sql.DB, dc *s.DataCache, multiplier int64) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { now := time.Now().Unix() path := strings.Split(req.URL.Path, "/") @@ -54,7 +54,19 @@ func dumpData(db *sql.DB, multiplier int64) func(http.ResponseWriter, *http.Requ offset := int64(count) * multiplier t := now*1000 - offset //log.Println("req start ", t) - msg, err := s.GetRows(db, t) + var msg []byte + cachedPoints := dc.ReadSince(uint64(t)) + if cachedPoints != nil { + msg, err = s.DatapointsToJson(cachedPoints) + if err != nil { + msg = nil + } + } + // fallback to actually reading from the sql + if msg == nil { + log.Println("unable to read from cache for ", t, " using sql") + msg, err = s.GetRows(db, t) + } if err != nil { w.WriteHeader(500) w.Write([]byte(err.Error())) @@ -86,11 +98,11 @@ func main() { log.Fatal("unable to use subdirectory of embedded fs: ", err) } - dumpWeek := dumpData(db, 7*24*60*60*1000) - dumpDay := dumpData(db, 24*60*60*1000) - dumpHour := dumpData(db, 60*60*1000) - dumpMinute := dumpData(db, 60*1000) - dumpSecond := dumpData(db, 1000) + dumpWeek := dumpData(db, &state.Cache, 7*24*60*60*1000) + dumpDay := dumpData(db, &state.Cache, 24*60*60*1000) + dumpHour := dumpData(db, &state.Cache, 60*60*1000) + dumpMinute := dumpData(db, &state.Cache, 60*1000) + dumpSecond := dumpData(db, &state.Cache, 1000) lastPoint := func(w http.ResponseWriter, _req *http.Request) { msg, err := s.LastTime(db)