Add caching to reduce load on sqlite
This commit is contained in:
parent
b4b1d2d60b
commit
e56c4835f1
|
@ -0,0 +1,81 @@
|
|||
package shroom_internals
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const MAX_CACHE_POINTS = 1000
|
||||
|
||||
type DataCache struct {
|
||||
sync.RWMutex
|
||||
data []Datapoint
|
||||
start int
|
||||
}
|
||||
|
||||
func (dc *DataCache) Add(data *Datapoint) {
|
||||
dc.Lock()
|
||||
defer dc.Unlock()
|
||||
|
||||
if len(dc.data) < MAX_CACHE_POINTS {
|
||||
dc.data = append(dc.data, *data)
|
||||
} else {
|
||||
dc.data[dc.start] = *data
|
||||
dc.start += 1
|
||||
if dc.start >= len(dc.data) {
|
||||
dc.start = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DataCache) binarySearch(s int, e int, target uint64) int {
|
||||
for s < e {
|
||||
mid := (s + e) / 2
|
||||
if dc.data[mid].Time < target {
|
||||
s = mid + 1
|
||||
} else {
|
||||
e = mid
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (dc *DataCache) ReadSince(start_time uint64) []Datapoint {
|
||||
dc.RLock()
|
||||
defer dc.RUnlock()
|
||||
|
||||
if len(dc.data) == 0 {
|
||||
return nil
|
||||
}
|
||||
if dc.data[dc.start].Time <= start_time {
|
||||
// binary search for the point right at/after
|
||||
// start_time
|
||||
|
||||
if len(dc.data) >= MAX_CACHE_POINTS {
|
||||
// if the data is full length we've been wrapping
|
||||
// around
|
||||
if dc.data[0].Time <= start_time {
|
||||
// it's between [0, start)
|
||||
idx := dc.binarySearch(0, dc.start, start_time)
|
||||
ret := []Datapoint{}
|
||||
ret = append(ret, dc.data[idx:dc.start]...)
|
||||
return ret
|
||||
} else {
|
||||
// it's between [start, len(dc.data))
|
||||
// and we can always include [0, start) afterwards
|
||||
idx := dc.binarySearch(dc.start, len(dc.data), start_time)
|
||||
ret := []Datapoint{}
|
||||
ret = append(ret, dc.data[idx:len(dc.data)]...)
|
||||
ret = append(ret, dc.data[0:dc.start]...)
|
||||
return ret
|
||||
}
|
||||
} else {
|
||||
// it's between [0, len(dc.data))
|
||||
idx := dc.binarySearch(0, len(dc.data), start_time)
|
||||
ret := []Datapoint{}
|
||||
ret = append(ret, dc.data[idx:len(dc.data)]...)
|
||||
return ret
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package shroom_internals
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
cache := DataCache{}
|
||||
for i := 0; i < 2*MAX_CACHE_POINTS; i++ {
|
||||
cache.Add(&Datapoint{Time: uint64(i)})
|
||||
if len(cache.data) > MAX_CACHE_POINTS {
|
||||
t.Errorf("cache expanded past limit at %d, len = %d \n", i, len(cache.data))
|
||||
}
|
||||
if (i % 100) == 0 {
|
||||
for j := 0; j < 3*MAX_CACHE_POINTS; j += 25 {
|
||||
result := cache.ReadSince(uint64(j))
|
||||
if j > (i - MAX_CACHE_POINTS) {
|
||||
if result == nil {
|
||||
t.Errorf("read was nil instead of an array at %d, %d\n", i, j)
|
||||
} else {
|
||||
expected := i - j + 1
|
||||
if expected < 0 {
|
||||
expected = 0
|
||||
}
|
||||
if len(result) != expected {
|
||||
t.Errorf("invalid read len %d != %d\n", len(result), expected)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if result != nil {
|
||||
t.Errorf("read should have been nil at %d, %d %v\n", i, j, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCacheAdd(b *testing.B) {
|
||||
cache := DataCache{}
|
||||
dp := Datapoint{}
|
||||
for n := 0; n < b.N; n++ {
|
||||
cache.Add(&dp)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCacheRead(b *testing.B) {
|
||||
cache := DataCache{}
|
||||
for i := 0; i < int(1.3*MAX_CACHE_POINTS); i++ {
|
||||
cache.Add(&Datapoint{Time: uint64(i)})
|
||||
}
|
||||
for n := 0; n < b.N; n++ {
|
||||
cache.ReadSince(MAX_CACHE_POINTS)
|
||||
}
|
||||
}
|
|
@ -6,6 +6,15 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
func DatapointsToJson(ds []Datapoint) ([]byte, error) {
|
||||
msg, err := json.Marshal(ds)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("json marshal error: %w", err)
|
||||
}
|
||||
return msg, nil
|
||||
|
||||
}
|
||||
|
||||
func GetRows(db *sql.DB, t int64) ([]byte, error) {
|
||||
results, err := QueryHistory(db, t)
|
||||
if err != nil {
|
||||
|
|
|
@ -33,6 +33,15 @@ type DataJson struct {
|
|||
HumidifierVolts *float32 `json:"hv"`
|
||||
}
|
||||
|
||||
func (dj *DataJson) ToDatapoint() *Datapoint {
|
||||
return &Datapoint{
|
||||
Time: *dj.Time,
|
||||
Temperature: *dj.Temperature,
|
||||
Humidity: *dj.Humidity,
|
||||
HumidifierVolts: *dj.HumidifierVolts,
|
||||
}
|
||||
}
|
||||
|
||||
type ShroomPacket struct {
|
||||
Data *DataJson `json:"data"`
|
||||
Status *StatusJson `json:"status"`
|
||||
|
@ -51,6 +60,8 @@ type ShroomState struct {
|
|||
ParamsWait chan struct{}
|
||||
|
||||
Commands chan []byte
|
||||
|
||||
Cache DataCache
|
||||
}
|
||||
|
||||
func NewShroomState() ShroomState {
|
||||
|
@ -60,6 +71,7 @@ func NewShroomState() ShroomState {
|
|||
StatusWait: make(chan struct{}),
|
||||
ParamsWait: make(chan struct{}),
|
||||
Commands: make(chan []byte),
|
||||
Cache: DataCache{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,6 +155,9 @@ func QueryParams(state *ShroomState) ([]byte, error) {
|
|||
|
||||
}
|
||||
|
||||
// used to cache data points to avoid accessing
|
||||
// sqlite for most queries
|
||||
|
||||
func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
|
||||
packet := ShroomPacket{}
|
||||
err := json.Unmarshal(line, &packet)
|
||||
|
@ -156,6 +171,7 @@ func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
|
|||
if packet.Data != nil {
|
||||
if packet.Data.Time != nil && packet.Data.Temperature != nil && packet.Data.Humidity != nil && packet.Data.HumidifierVolts != nil {
|
||||
err = InsertRow(db, packet.Data)
|
||||
state.Cache.Add(packet.Data.ToDatapoint())
|
||||
if err != nil {
|
||||
log.Println("unable to write to database: ", err)
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ type adminMsg struct {
|
|||
|
||||
// returns a function that multiplies the number at the very last segment of the url
|
||||
// and returns the data that was collected in the last n*multiplier milliseconds
|
||||
func dumpData(db *sql.DB, multiplier int64) func(http.ResponseWriter, *http.Request) {
|
||||
func dumpData(db *sql.DB, dc *s.DataCache, multiplier int64) func(http.ResponseWriter, *http.Request) {
|
||||
return func(w http.ResponseWriter, req *http.Request) {
|
||||
now := time.Now().Unix()
|
||||
path := strings.Split(req.URL.Path, "/")
|
||||
|
@ -54,7 +54,19 @@ func dumpData(db *sql.DB, multiplier int64) func(http.ResponseWriter, *http.Requ
|
|||
offset := int64(count) * multiplier
|
||||
t := now*1000 - offset
|
||||
//log.Println("req start ", t)
|
||||
msg, err := s.GetRows(db, t)
|
||||
var msg []byte
|
||||
cachedPoints := dc.ReadSince(uint64(t))
|
||||
if cachedPoints != nil {
|
||||
msg, err = s.DatapointsToJson(cachedPoints)
|
||||
if err != nil {
|
||||
msg = nil
|
||||
}
|
||||
}
|
||||
// fallback to actually reading from the sql
|
||||
if msg == nil {
|
||||
log.Println("unable to read from cache for ", t, " using sql")
|
||||
msg, err = s.GetRows(db, t)
|
||||
}
|
||||
if err != nil {
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(err.Error()))
|
||||
|
@ -86,11 +98,11 @@ func main() {
|
|||
log.Fatal("unable to use subdirectory of embedded fs: ", err)
|
||||
}
|
||||
|
||||
dumpWeek := dumpData(db, 7*24*60*60*1000)
|
||||
dumpDay := dumpData(db, 24*60*60*1000)
|
||||
dumpHour := dumpData(db, 60*60*1000)
|
||||
dumpMinute := dumpData(db, 60*1000)
|
||||
dumpSecond := dumpData(db, 1000)
|
||||
dumpWeek := dumpData(db, &state.Cache, 7*24*60*60*1000)
|
||||
dumpDay := dumpData(db, &state.Cache, 24*60*60*1000)
|
||||
dumpHour := dumpData(db, &state.Cache, 60*60*1000)
|
||||
dumpMinute := dumpData(db, &state.Cache, 60*1000)
|
||||
dumpSecond := dumpData(db, &state.Cache, 1000)
|
||||
|
||||
lastPoint := func(w http.ResponseWriter, _req *http.Request) {
|
||||
msg, err := s.LastTime(db)
|
||||
|
|
Loading…
Reference in New Issue