package shroom_internals import ( "database/sql" "encoding/json" "errors" "fmt" "log" "net" "os" "sync" "time" ) func newlinePos(s []byte) int { for i, v := range s { if v == '\n' { return i } } return -1 } type StatusJson struct { HumOn *bool `json:"humidifier"` Hum2On *bool `json:"humidifier2"` ManualMode *bool `json:"manual_mode"` } type DataJson struct { Time *uint64 `json:"time"` Temperature *float32 `json:"temp"` Humidity *float32 `json:"hum"` HumidifierVolts *float32 `json:"hv"` HumidifierVolts2 *float32 `json:"hv2"` } func (dj *DataJson) ToDatapoint() *Datapoint { return &Datapoint{ Time: *dj.Time, Temperature: *dj.Temperature, Humidity: *dj.Humidity, HumidifierVolts: *dj.HumidifierVolts, HumidifierVolts2: *dj.HumidifierVolts2, } } type ShroomPacket struct { Data *DataJson `json:"data"` Status *StatusJson `json:"status"` Params map[string]float32 `json:"params"` } type ShroomState struct { sync.RWMutex HumidifierOn bool Humidifier2On bool ManualMode bool NumConnections int Params map[string]float32 Wait chan struct{} StatusWait chan struct{} ParamsWait chan struct{} Commands chan []byte Cache DataCache } func NewShroomState() ShroomState { return ShroomState{ Params: make(map[string]float32), Wait: make(chan struct{}), StatusWait: make(chan struct{}), ParamsWait: make(chan struct{}), Commands: make(chan []byte), Cache: DataCache{}, } } func (s *ShroomState) Update() { s.Lock() defer s.Unlock() close(s.Wait) s.Wait = make(chan struct{}) } func (s *ShroomState) StatusUpdate() { s.Lock() defer s.Unlock() close(s.StatusWait) s.StatusWait = make(chan struct{}) } func (s *ShroomState) ParamsUpdate() { s.Lock() defer s.Unlock() close(s.ParamsWait) s.ParamsWait = make(chan struct{}) } func WaitForClose(c chan struct{}) { stillopen := true for stillopen { _, stillopen = <-c } } func WaitForCloseWithTimeout(c chan struct{}, t time.Duration) bool { stillopen := true timeout := make(chan struct{}) go func() { time.Sleep(t) timeout <- struct{}{} }() for stillopen { select { case _, stillopen = <-c: if stillopen { continue } case <-timeout: return false } } return true } type queryParams struct { QueryParams bool `json:"query_params"` } func QueryParams(state *ShroomState) ([]byte, error) { msg, err := json.Marshal(queryParams{ QueryParams: true, }) if err != nil { err = fmt.Errorf("unable to marshal query request: %w", err) return nil, err } select { case state.Commands <- msg: if !WaitForCloseWithTimeout(state.ParamsWait, 2*time.Second) { return nil, fmt.Errorf("request timed out") } state.RLock() resp, err := json.Marshal(state.Params) state.RUnlock() if err != nil { err = fmt.Errorf("unable to unmarshal params as json: %w", err) return nil, err } return resp, nil default: return nil, fmt.Errorf("unable to forward request; controller may not be connected") } } // used to cache data points to avoid accessing // sqlite for most queries func parseMsg(line []byte, db *sql.DB, state *ShroomState) { packet := ShroomPacket{} err := json.Unmarshal(line, &packet) if err != nil { log.Println("unable to parse tcp line: ", err) log.Println(string(line)) log.Println(line) return } //log.Println("received data ", data) if packet.Data != nil { if packet.Data.Time != nil && packet.Data.Temperature != nil && packet.Data.Humidity != nil && packet.Data.HumidifierVolts != nil && packet.Data.HumidifierVolts2 != nil { err = InsertRow(db, packet.Data) state.Cache.Add(packet.Data.ToDatapoint()) if err != nil { log.Println("unable to write to database: ", err) } // we got a data packet state.Update() } } else if packet.Status != nil { //log.Println("received status ", data.Status) state.Lock() if packet.Status.HumOn != nil { state.HumidifierOn = *packet.Status.HumOn } if packet.Status.Hum2On != nil { state.Humidifier2On = *packet.Status.Hum2On } if packet.Status.ManualMode != nil { state.ManualMode = *packet.Status.ManualMode } state.Unlock() state.StatusUpdate() } else if packet.Params != nil { state.Lock() state.Params = packet.Params state.Unlock() state.ParamsUpdate() } else { log.Println("unknown packet: ", line, string(line)) } } func InitTcpServer(db *sql.DB, state *ShroomState) { // start TCP server for the pipe from the raspberry pi ln, err := net.Listen("tcp", ":9876") if err != nil { log.Fatal("unable to open tcp server: ", err) } go func() { for { conn, err := ln.Accept() if err != nil { log.Println("tcp accept error: ", err) return } // not spawning a goroutine here // should limit the number of connections to // one hopefully // wrapping in a func() so that I can use defer // to automatically decrement the number of connections func() { state.Lock() state.NumConnections += 1 state.Unlock() state.StatusUpdate() defer func() { state.Lock() state.NumConnections -= 1 state.Unlock() state.StatusUpdate() log.Println("connection disconnected") }() log.Println("connection started") // write loop; waits for commands and forwards them exiting := make(chan struct{}) go func() { for { select { case v, ok := <-state.Commands: if !ok { return } v = append(v, []byte("\n")...) _, err := conn.Write(v) if err != nil { log.Println("tcp write err: ", err) } case <-exiting: return } } }() defer func() { close(exiting) }() // deal with the read side of the connection buf := make([]byte, 1024) left := buf for { if len(left) == 0 { log.Println("overflow detected, truncating data") left = buf } _, err := conn.Write([]byte("\n")) if err != nil { log.Println("tcp write error: ", err) _ = conn.Close() log.Println("disconnected from client") break } err = conn.SetReadDeadline(time.Now().Add(time.Second)) if err != nil { log.Println("unable to set read deadline ", err) _ = conn.Close() break } num_read, err := conn.Read(left) left = left[num_read:] if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) { log.Println("tcp read error: ", err) _ = conn.Close() log.Println("disconnected from client") break } // parse the message to see if it's finished unread := buf[:len(buf)-len(left)] for newlinePos(unread) != -1 { end := newlinePos(unread) line := unread[:end] unread = unread[end+1:] // skip empty lines if len(line) == 0 { continue } parseMsg(line, db, state) } // shift the remaining data back to the start of the buffer copy(buf[:len(unread)], unread) left = buf[len(unread):] } }() } }() }