323 lines
7.0 KiB
Go
323 lines
7.0 KiB
Go
package shroom_internals
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
func newlinePos(s []byte) int {
|
|
for i, v := range s {
|
|
if v == '\n' {
|
|
return i
|
|
}
|
|
}
|
|
return -1
|
|
}
|
|
|
|
type StatusJson struct {
|
|
HumOn *bool `json:"humidifier"`
|
|
Hum2On *bool `json:"humidifier2"`
|
|
ManualMode *bool `json:"manual_mode"`
|
|
}
|
|
|
|
type DataJson struct {
|
|
Time *uint64 `json:"time"`
|
|
Temperature *float32 `json:"temp"`
|
|
Humidity *float32 `json:"hum"`
|
|
HumidifierVolts *float32 `json:"hv"`
|
|
HumidifierVolts2 *float32 `json:"hv2"`
|
|
}
|
|
|
|
func (dj *DataJson) ToDatapoint() *Datapoint {
|
|
return &Datapoint{
|
|
Time: *dj.Time,
|
|
Temperature: *dj.Temperature,
|
|
Humidity: *dj.Humidity,
|
|
HumidifierVolts: *dj.HumidifierVolts,
|
|
HumidifierVolts2: *dj.HumidifierVolts2,
|
|
}
|
|
}
|
|
|
|
type ShroomPacket struct {
|
|
Data *DataJson `json:"data"`
|
|
Status *StatusJson `json:"status"`
|
|
Params map[string]float32 `json:"params"`
|
|
}
|
|
|
|
type ShroomState struct {
|
|
sync.RWMutex
|
|
HumidifierOn bool
|
|
Humidifier2On bool
|
|
ManualMode bool
|
|
NumConnections int
|
|
|
|
Params map[string]float32
|
|
Wait chan struct{}
|
|
StatusWait chan struct{}
|
|
ParamsWait chan struct{}
|
|
|
|
Commands chan []byte
|
|
|
|
Cache DataCache
|
|
}
|
|
|
|
func NewShroomState() ShroomState {
|
|
return ShroomState{
|
|
Params: make(map[string]float32),
|
|
Wait: make(chan struct{}),
|
|
StatusWait: make(chan struct{}),
|
|
ParamsWait: make(chan struct{}),
|
|
Commands: make(chan []byte),
|
|
Cache: DataCache{},
|
|
}
|
|
}
|
|
|
|
func (s *ShroomState) Update() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
close(s.Wait)
|
|
s.Wait = make(chan struct{})
|
|
}
|
|
|
|
func (s *ShroomState) StatusUpdate() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
close(s.StatusWait)
|
|
s.StatusWait = make(chan struct{})
|
|
}
|
|
|
|
func (s *ShroomState) ParamsUpdate() {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
close(s.ParamsWait)
|
|
s.ParamsWait = make(chan struct{})
|
|
}
|
|
|
|
func WaitForClose(c chan struct{}) {
|
|
stillopen := true
|
|
for stillopen {
|
|
_, stillopen = <-c
|
|
}
|
|
}
|
|
|
|
func WaitForCloseWithTimeout(c chan struct{}, t time.Duration) bool {
|
|
stillopen := true
|
|
timeout := make(chan struct{})
|
|
go func() {
|
|
time.Sleep(t)
|
|
timeout <- struct{}{}
|
|
}()
|
|
|
|
for stillopen {
|
|
select {
|
|
case _, stillopen = <-c:
|
|
if stillopen {
|
|
continue
|
|
}
|
|
case <-timeout:
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
type queryParams struct {
|
|
QueryParams bool `json:"query_params"`
|
|
}
|
|
|
|
func QueryParams(state *ShroomState) ([]byte, error) {
|
|
msg, err := json.Marshal(queryParams{
|
|
QueryParams: true,
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("unable to marshal query request: %w", err)
|
|
return nil, err
|
|
}
|
|
select {
|
|
case state.Commands <- msg:
|
|
if !WaitForCloseWithTimeout(state.ParamsWait, 2*time.Second) {
|
|
return nil, fmt.Errorf("request timed out")
|
|
}
|
|
state.RLock()
|
|
resp, err := json.Marshal(state.Params)
|
|
state.RUnlock()
|
|
if err != nil {
|
|
err = fmt.Errorf("unable to unmarshal params as json: %w", err)
|
|
return nil, err
|
|
}
|
|
return resp, nil
|
|
default:
|
|
return nil, fmt.Errorf("unable to forward request; controller may not be connected")
|
|
}
|
|
|
|
}
|
|
|
|
// used to cache data points to avoid accessing
|
|
// sqlite for most queries
|
|
|
|
func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
|
|
packet := ShroomPacket{}
|
|
err := json.Unmarshal(line, &packet)
|
|
if err != nil {
|
|
log.Println("unable to parse tcp line: ", err)
|
|
log.Println(string(line))
|
|
log.Println(line)
|
|
return
|
|
}
|
|
//log.Println("received data ", data)
|
|
if packet.Data != nil {
|
|
if packet.Data.Time != nil && packet.Data.Temperature != nil && packet.Data.Humidity != nil && packet.Data.HumidifierVolts != nil && packet.Data.HumidifierVolts2 != nil {
|
|
err = InsertRow(db, packet.Data)
|
|
state.Cache.Add(packet.Data.ToDatapoint())
|
|
if err != nil {
|
|
log.Println("unable to write to database: ", err)
|
|
}
|
|
// we got a data packet
|
|
state.Update()
|
|
}
|
|
} else if packet.Status != nil {
|
|
//log.Println("received status ", data.Status)
|
|
state.Lock()
|
|
if packet.Status.HumOn != nil {
|
|
state.HumidifierOn = *packet.Status.HumOn
|
|
}
|
|
if packet.Status.Hum2On != nil {
|
|
state.Humidifier2On = *packet.Status.Hum2On
|
|
}
|
|
if packet.Status.ManualMode != nil {
|
|
state.ManualMode = *packet.Status.ManualMode
|
|
}
|
|
state.Unlock()
|
|
state.StatusUpdate()
|
|
} else if packet.Params != nil {
|
|
state.Lock()
|
|
state.Params = packet.Params
|
|
state.Unlock()
|
|
state.ParamsUpdate()
|
|
} else {
|
|
log.Println("unknown packet: ", line, string(line))
|
|
}
|
|
|
|
}
|
|
|
|
func InitTcpServer(db *sql.DB, state *ShroomState) {
|
|
// start TCP server for the pipe from the raspberry pi
|
|
ln, err := net.Listen("tcp", ":9876")
|
|
if err != nil {
|
|
log.Fatal("unable to open tcp server: ", err)
|
|
}
|
|
go func() {
|
|
for {
|
|
conn, err := ln.Accept()
|
|
if err != nil {
|
|
log.Println("tcp accept error: ", err)
|
|
return
|
|
}
|
|
// not spawning a goroutine here
|
|
// should limit the number of connections to
|
|
// one hopefully
|
|
|
|
// wrapping in a func() so that I can use defer
|
|
// to automatically decrement the number of connections
|
|
func() {
|
|
state.Lock()
|
|
state.NumConnections += 1
|
|
state.Unlock()
|
|
state.StatusUpdate()
|
|
|
|
defer func() {
|
|
state.Lock()
|
|
state.NumConnections -= 1
|
|
state.Unlock()
|
|
state.StatusUpdate()
|
|
log.Println("connection disconnected")
|
|
}()
|
|
|
|
log.Println("connection started")
|
|
|
|
// write loop; waits for commands and forwards them
|
|
exiting := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case v, ok := <-state.Commands:
|
|
if !ok {
|
|
return
|
|
}
|
|
v = append(v, []byte("\n")...)
|
|
_, err := conn.Write(v)
|
|
if err != nil {
|
|
log.Println("tcp write err: ", err)
|
|
}
|
|
case <-exiting:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
defer func() {
|
|
close(exiting)
|
|
}()
|
|
|
|
// deal with the read side of the connection
|
|
buf := make([]byte, 1024)
|
|
left := buf
|
|
|
|
for {
|
|
if len(left) == 0 {
|
|
log.Println("overflow detected, truncating data")
|
|
left = buf
|
|
}
|
|
|
|
_, err := conn.Write([]byte("\n"))
|
|
if err != nil {
|
|
log.Println("tcp write error: ", err)
|
|
_ = conn.Close()
|
|
log.Println("disconnected from client")
|
|
break
|
|
}
|
|
err = conn.SetReadDeadline(time.Now().Add(time.Second))
|
|
if err != nil {
|
|
log.Println("unable to set read deadline ", err)
|
|
_ = conn.Close()
|
|
break
|
|
}
|
|
num_read, err := conn.Read(left)
|
|
left = left[num_read:]
|
|
|
|
if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
|
|
log.Println("tcp read error: ", err)
|
|
_ = conn.Close()
|
|
log.Println("disconnected from client")
|
|
break
|
|
}
|
|
// parse the message to see if it's finished
|
|
unread := buf[:len(buf)-len(left)]
|
|
|
|
for newlinePos(unread) != -1 {
|
|
end := newlinePos(unread)
|
|
line := unread[:end]
|
|
unread = unread[end+1:]
|
|
|
|
// skip empty lines
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
parseMsg(line, db, state)
|
|
}
|
|
// shift the remaining data back to the start of the buffer
|
|
copy(buf[:len(unread)], unread)
|
|
left = buf[len(unread):]
|
|
}
|
|
}()
|
|
}
|
|
}()
|
|
}
|