package shroom_internals import ( "database/sql" "encoding/json" "log" "net" "sync" ) func newlinePos(s []byte) int { for i, v := range s { if v == '\n' { return i } } return -1 } type ShroomData struct { Time uint64 `json:"time"` Temperature float32 `json:"temp"` Humidity float32 `json:"hum"` HumidifierVolts float32 `json:"hv"` Status int32 `json:"status"` } type ShroomStatus struct { sync.RWMutex HumidifierOn bool NumConnections int Wait chan struct{} StatusWait chan struct{} Commands chan []byte } func (s *ShroomStatus) Update() { s.Lock() defer s.Unlock() close(s.Wait) s.Wait = make(chan struct{}) } func (s *ShroomStatus) StatusUpdate() { s.Lock() defer s.Unlock() close(s.StatusWait) s.StatusWait = make(chan struct{}) } func parseMsg(line []byte, db *sql.DB, status *ShroomStatus) { data := ShroomData{ Time: 0, Temperature: -274, Humidity: -1, HumidifierVolts: -1, Status: -1, } err := json.Unmarshal(line, &data) if err != nil { log.Println("unable to parse tcp line: ", err) log.Println(string(line)) log.Println(line) return } //log.Println("received data ", data) if data.Time > 0 && data.Temperature > -275 && data.Humidity > -1 && data.HumidifierVolts > -1 { err = InsertRow(db, &data) if err != nil { log.Println("unable to write to database: ", err) } // we got a data packet status.Update() } else if data.Status != -1 { //log.Println("received status ", data.Status) status.Lock() // TODO change to have more detailed data status.HumidifierOn = (data.Status & 1) == 1 status.Unlock() status.StatusUpdate() } else { log.Println("unknown packet: ", line, string(line)) } } func InitTcpServer(db *sql.DB, status *ShroomStatus) { // start TCP server for the pipe from the raspberry pi ln, err := net.Listen("tcp", ":9876") if err != nil { log.Fatal("unable to open tcp server: ", err) } go func() { for { conn, err := ln.Accept() if err != nil { log.Println("tcp accept error: ", err) return } // not spawning a goroutine here // should limit the number of connections to // one hopefully // wrapping in a func() so that I can use defer // to automatically decrement the number of connections func() { status.Lock() status.NumConnections += 1 status.Unlock() status.StatusUpdate() defer func() { status.Lock() status.NumConnections -= 1 status.Unlock() status.StatusUpdate() log.Println("connection disconnected") }() log.Println("connection started") // write loop; waits for commands and forwards them exiting := make(chan struct{}) go func() { for { select { case v, ok := <-status.Commands: if !ok { return } v = append(v, []byte("\n")...) _, err := conn.Write(v) if err != nil { log.Println("tcp write err: ", err) } case <-exiting: return } } }() defer func() { close(exiting) }() // deal with the read side of the connection buf := make([]byte, 128) left := buf for { num_read, err := conn.Read(left) //log.Println("received: ", string(left[:num_read])) left = left[num_read:] //log.Println("buf ", buf) //log.Println("left ", left) if err != nil { log.Println("tcp read error: ", err) _ = conn.Close() log.Println("disconnected from client") break } // parse the message to see if it's finished unread := buf[:len(buf)-len(left)] for newlinePos(unread) != -1 { end := newlinePos(unread) line := unread[:end] unread = unread[end+1:] //log.Println("line ", line) //log.Println("unread ", unread) // skip empty lines if len(line) == 0 { continue } parseMsg(line, db, status) } // shift the remaining data back to the start of the buffer copy(buf[:len(unread)], unread) left = buf[len(unread):] } }() } }() }