shrooms-server/shroom_internals/tcp_server.go

323 lines
7.0 KiB
Go

package shroom_internals
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"os"
"sync"
"time"
)
func newlinePos(s []byte) int {
for i, v := range s {
if v == '\n' {
return i
}
}
return -1
}
type StatusJson struct {
HumOn *bool `json:"humidifier"`
Hum2On *bool `json:"humidifier2"`
ManualMode *bool `json:"manual_mode"`
}
type DataJson struct {
Time *uint64 `json:"time"`
Temperature *float32 `json:"temp"`
Humidity *float32 `json:"hum"`
HumidifierVolts *float32 `json:"hv"`
HumidifierVolts2 *float32 `json:"hv2"`
}
func (dj *DataJson) ToDatapoint() *Datapoint {
return &Datapoint{
Time: *dj.Time,
Temperature: *dj.Temperature,
Humidity: *dj.Humidity,
HumidifierVolts: *dj.HumidifierVolts,
HumidifierVolts2: *dj.HumidifierVolts2,
}
}
type ShroomPacket struct {
Data *DataJson `json:"data"`
Status *StatusJson `json:"status"`
Params map[string]float32 `json:"params"`
}
type ShroomState struct {
sync.RWMutex
HumidifierOn bool
Humidifier2On bool
ManualMode bool
NumConnections int
Params map[string]float32
Wait chan struct{}
StatusWait chan struct{}
ParamsWait chan struct{}
Commands chan []byte
Cache DataCache
}
func NewShroomState() ShroomState {
return ShroomState{
Params: make(map[string]float32),
Wait: make(chan struct{}),
StatusWait: make(chan struct{}),
ParamsWait: make(chan struct{}),
Commands: make(chan []byte),
Cache: DataCache{},
}
}
func (s *ShroomState) Update() {
s.Lock()
defer s.Unlock()
close(s.Wait)
s.Wait = make(chan struct{})
}
func (s *ShroomState) StatusUpdate() {
s.Lock()
defer s.Unlock()
close(s.StatusWait)
s.StatusWait = make(chan struct{})
}
func (s *ShroomState) ParamsUpdate() {
s.Lock()
defer s.Unlock()
close(s.ParamsWait)
s.ParamsWait = make(chan struct{})
}
func WaitForClose(c chan struct{}) {
stillopen := true
for stillopen {
_, stillopen = <-c
}
}
func WaitForCloseWithTimeout(c chan struct{}, t time.Duration) bool {
stillopen := true
timeout := make(chan struct{})
go func() {
time.Sleep(t)
timeout <- struct{}{}
}()
for stillopen {
select {
case _, stillopen = <-c:
if stillopen {
continue
}
case <-timeout:
return false
}
}
return true
}
type queryParams struct {
QueryParams bool `json:"query_params"`
}
func QueryParams(state *ShroomState) ([]byte, error) {
msg, err := json.Marshal(queryParams{
QueryParams: true,
})
if err != nil {
err = fmt.Errorf("unable to marshal query request: %w", err)
return nil, err
}
select {
case state.Commands <- msg:
if !WaitForCloseWithTimeout(state.ParamsWait, 2*time.Second) {
return nil, fmt.Errorf("request timed out")
}
state.RLock()
resp, err := json.Marshal(state.Params)
state.RUnlock()
if err != nil {
err = fmt.Errorf("unable to unmarshal params as json: %w", err)
return nil, err
}
return resp, nil
default:
return nil, fmt.Errorf("unable to forward request; controller may not be connected")
}
}
// used to cache data points to avoid accessing
// sqlite for most queries
func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
packet := ShroomPacket{}
err := json.Unmarshal(line, &packet)
if err != nil {
log.Println("unable to parse tcp line: ", err)
log.Println(string(line))
log.Println(line)
return
}
//log.Println("received data ", data)
if packet.Data != nil {
if packet.Data.Time != nil && packet.Data.Temperature != nil && packet.Data.Humidity != nil && packet.Data.HumidifierVolts != nil && packet.Data.HumidifierVolts2 != nil {
err = InsertRow(db, packet.Data)
state.Cache.Add(packet.Data.ToDatapoint())
if err != nil {
log.Println("unable to write to database: ", err)
}
// we got a data packet
state.Update()
}
} else if packet.Status != nil {
//log.Println("received status ", data.Status)
state.Lock()
if packet.Status.HumOn != nil {
state.HumidifierOn = *packet.Status.HumOn
}
if packet.Status.Hum2On != nil {
state.Humidifier2On = *packet.Status.Hum2On
}
if packet.Status.ManualMode != nil {
state.ManualMode = *packet.Status.ManualMode
}
state.Unlock()
state.StatusUpdate()
} else if packet.Params != nil {
state.Lock()
state.Params = packet.Params
state.Unlock()
state.ParamsUpdate()
} else {
log.Println("unknown packet: ", line, string(line))
}
}
func InitTcpServer(db *sql.DB, state *ShroomState) {
// start TCP server for the pipe from the raspberry pi
ln, err := net.Listen("tcp", ":9876")
if err != nil {
log.Fatal("unable to open tcp server: ", err)
}
go func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Println("tcp accept error: ", err)
return
}
// not spawning a goroutine here
// should limit the number of connections to
// one hopefully
// wrapping in a func() so that I can use defer
// to automatically decrement the number of connections
func() {
state.Lock()
state.NumConnections += 1
state.Unlock()
state.StatusUpdate()
defer func() {
state.Lock()
state.NumConnections -= 1
state.Unlock()
state.StatusUpdate()
log.Println("connection disconnected")
}()
log.Println("connection started")
// write loop; waits for commands and forwards them
exiting := make(chan struct{})
go func() {
for {
select {
case v, ok := <-state.Commands:
if !ok {
return
}
v = append(v, []byte("\n")...)
_, err := conn.Write(v)
if err != nil {
log.Println("tcp write err: ", err)
}
case <-exiting:
return
}
}
}()
defer func() {
close(exiting)
}()
// deal with the read side of the connection
buf := make([]byte, 1024)
left := buf
for {
if len(left) == 0 {
log.Println("overflow detected, truncating data")
left = buf
}
_, err := conn.Write([]byte("\n"))
if err != nil {
log.Println("tcp write error: ", err)
_ = conn.Close()
log.Println("disconnected from client")
break
}
err = conn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
log.Println("unable to set read deadline ", err)
_ = conn.Close()
break
}
num_read, err := conn.Read(left)
left = left[num_read:]
if err != nil && !errors.Is(err, os.ErrDeadlineExceeded) {
log.Println("tcp read error: ", err)
_ = conn.Close()
log.Println("disconnected from client")
break
}
// parse the message to see if it's finished
unread := buf[:len(buf)-len(left)]
for newlinePos(unread) != -1 {
end := newlinePos(unread)
line := unread[:end]
unread = unread[end+1:]
// skip empty lines
if len(line) == 0 {
continue
}
parseMsg(line, db, state)
}
// shift the remaining data back to the start of the buffer
copy(buf[:len(unread)], unread)
left = buf[len(unread):]
}
}()
}
}()
}