shrooms-server/shroom_internals/tcp_server.go

216 lines
4.6 KiB
Go

package shroom_internals
import (
"database/sql"
"encoding/json"
"log"
"net"
"sync"
)
func newlinePos(s []byte) int {
for i, v := range s {
if v == '\n' {
return i
}
}
return -1
}
type StatusJson struct {
HumOn *bool `json:"humidifier"`
ManualMode *bool `json:"manual_mode"`
}
type DataJson struct {
Time *uint64 `json:"time"`
Temperature *float32 `json:"temp"`
Humidity *float32 `json:"hum"`
HumidifierVolts *float32 `json:"hv"`
}
type ParamJson struct {
Name string `json:"name"`
Value float32 `json:"string"`
}
type ShroomPacket struct {
Data *DataJson `json:"data"`
Status *StatusJson `json:"status"`
Param *ParamJson `json:"param"`
}
type ShroomState struct {
sync.RWMutex
HumidifierOn bool
NumConnections int
Params map[string]float32
Wait chan struct{}
StatusWait chan struct{}
Commands chan []byte
}
func NewShroomState() ShroomState {
return ShroomState{
Params: make(map[string]float32),
Wait: make(chan struct{}),
StatusWait: make(chan struct{}),
Commands: make(chan []byte),
}
}
func (s *ShroomState) Update() {
s.Lock()
defer s.Unlock()
close(s.Wait)
s.Wait = make(chan struct{})
}
func (s *ShroomState) StatusUpdate() {
s.Lock()
defer s.Unlock()
close(s.StatusWait)
s.StatusWait = make(chan struct{})
}
func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
packet := ShroomPacket{}
err := json.Unmarshal(line, &packet)
if err != nil {
log.Println("unable to parse tcp line: ", err)
log.Println(string(line))
log.Println(line)
return
}
//log.Println("received data ", data)
if packet.Data != nil {
if packet.Data.Time != nil && packet.Data.Temperature != nil && packet.Data.Humidity != nil && packet.Data.HumidifierVolts != nil {
err = InsertRow(db, packet.Data)
if err != nil {
log.Println("unable to write to database: ", err)
}
// we got a data packet
state.Update()
}
} else if packet.Status != nil {
//log.Println("received status ", data.Status)
state.Lock()
// TODO change to have more detailed data
if packet.Status.HumOn != nil {
state.HumidifierOn = *packet.Status.HumOn
}
state.Unlock()
state.StatusUpdate()
} else {
log.Println("unknown packet: ", line, string(line))
}
}
func InitTcpServer(db *sql.DB, state *ShroomState) {
// start TCP server for the pipe from the raspberry pi
ln, err := net.Listen("tcp", ":9876")
if err != nil {
log.Fatal("unable to open tcp server: ", err)
}
go func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Println("tcp accept error: ", err)
return
}
// not spawning a goroutine here
// should limit the number of connections to
// one hopefully
// wrapping in a func() so that I can use defer
// to automatically decrement the number of connections
func() {
state.Lock()
state.NumConnections += 1
state.Unlock()
state.StatusUpdate()
defer func() {
state.Lock()
state.NumConnections -= 1
state.Unlock()
state.StatusUpdate()
log.Println("connection disconnected")
}()
log.Println("connection started")
// write loop; waits for commands and forwards them
exiting := make(chan struct{})
go func() {
for {
select {
case v, ok := <-state.Commands:
if !ok {
return
}
v = append(v, []byte("\n")...)
_, err := conn.Write(v)
if err != nil {
log.Println("tcp write err: ", err)
}
case <-exiting:
return
}
}
}()
defer func() {
close(exiting)
}()
// deal with the read side of the connection
buf := make([]byte, 128)
left := buf
for {
num_read, err := conn.Read(left)
//log.Println("received: ", string(left[:num_read]))
left = left[num_read:]
//log.Println("buf ", buf)
//log.Println("left ", left)
if len(left) == 0 {
log.Println("overflow detected, truncating data")
left = buf
}
if err != nil {
log.Println("tcp read error: ", err)
_ = conn.Close()
log.Println("disconnected from client")
break
}
// parse the message to see if it's finished
unread := buf[:len(buf)-len(left)]
for newlinePos(unread) != -1 {
end := newlinePos(unread)
line := unread[:end]
unread = unread[end+1:]
//log.Println("line ", line)
//log.Println("unread ", unread)
// skip empty lines
if len(line) == 0 {
continue
}
parseMsg(line, db, state)
}
// shift the remaining data back to the start of the buffer
copy(buf[:len(unread)], unread)
left = buf[len(unread):]
}
}()
}
}()
}