shrooms-server/shroom_internals/tcp_server.go

146 lines
3.0 KiB
Go

package shroom_internals
import (
"database/sql"
"encoding/json"
"log"
"net"
"sync"
)
func newlinePos(s []byte) int {
for i, v := range s {
if v == '\n' {
return i
}
}
return -1
}
type ShroomData struct {
Time uint64 `json:"time"`
Temperature float32 `json:"temp"`
Humidity float32 `json:"hum"`
HumidifierVolts float32 `json:"hv"`
Status uint32 `json:"status"`
}
type ShroomStatus struct {
sync.RWMutex
HumidifierOn bool
NumConnections int
Wait chan struct{}
}
func (s *ShroomStatus) Update() {
s.Lock()
defer s.Unlock()
close(s.Wait)
s.Wait = make(chan struct{})
}
func parseMsg(line []byte, db *sql.DB, status *ShroomStatus) {
data := ShroomData{
Time: 0,
Temperature: -1,
Humidity: -1,
HumidifierVolts: -1,
Status: 0,
}
err := json.Unmarshal(line, &data)
if err != nil {
log.Println("unable to parse tcp line: ", err)
log.Println(string(line))
log.Println(line)
return
}
if data.Time > 0 && data.Temperature > 0 && data.Humidity > 0 && data.HumidifierVolts > 0 {
err = InsertRow(db, &data)
if err != nil {
log.Println("unable to write to database: ", err)
}
// we got a data packet
status.Update()
} else if data.Status > 0 {
status.Lock()
// TODO change to have more detailed data
status.HumidifierOn = (data.Status & 1) == 1
status.Unlock()
status.Update()
} else {
log.Println("unknown packet: ", line)
}
}
func InitTcpServer(db *sql.DB, status *ShroomStatus) {
// start TCP server for the pipe from the raspberry pi
ln, err := net.Listen("tcp", ":9876")
if err != nil {
log.Fatal("unable to open tcp server: ", err)
}
go func() {
for {
conn, err := ln.Accept()
if err != nil {
log.Println("tcp accept error: ", err)
continue
}
status.Lock()
status.NumConnections += 1
status.Unlock()
defer func() {
status.Lock()
status.NumConnections -= 1
status.Unlock()
log.Println("connection disconnected")
}()
log.Println("connection started")
go func() {
// TODO deal with the write side of the connection
}()
// deal with the read side of the connection
buf := make([]byte, 128)
left := buf
for {
num_read, err := conn.Read(left)
left = left[num_read:]
//log.Println("buf ", buf)
//log.Println("left ", left)
if err != nil {
log.Println("tcp read error: ", err)
_ = conn.Close()
log.Println("disconnected from client")
break
}
// parse the message to see if it's finished
unread := buf[:len(buf)-len(left)]
for newlinePos(unread) != -1 {
end := newlinePos(unread)
line := unread[:end]
unread = unread[end+1:]
//log.Println("line ", line)
//log.Println("unread ", unread)
// skip empty lines
if len(line) == 0 {
continue
}
parseMsg(line, db, status)
}
// shift the remaining data back to the start of the buffer
copy(buf[:len(unread)], unread)
left = buf[len(unread):]
}
}
}()
}