diff --git a/shroom_controller.py b/shroom_controller.py index 45dc8c8..aaf1419 100644 --- a/shroom_controller.py +++ b/shroom_controller.py @@ -24,15 +24,6 @@ def send_update(msg): process.stdin.write(bytes(json.dumps(msg) + "\n", "utf8")) process.stdin.flush() -exiting = False -# run thread to process data from process's stdout -def stdout_loop(): - while not exiting: - msg = process.stdout.readline() - print("got message ", msg) -stdout_thread = threading.Thread(target=stdout_loop) -stdout_thread.start() - class MockSerial: def __init__(self): self.humidity = np.zeros(100) @@ -118,6 +109,42 @@ target_lower = 0.85 target_upper = 0.90 feedforward_coeff = 50 +exiting = False +# run thread to process data from process's stdout +def stdout_loop(): + global process, target_lower, target_upper, feedforward_coeff + while not exiting: + msg = process.stdout.readline() + if len(msg) == 0: + continue + print("got message ", msg) + try: + msg_js = json.loads(msg) + if "query_params" in msg_js: + if msg_js["query_params"]: + send_update({"params": { + "target_lower": target_lower, + "target_upper": target_upper, + "feedforward_coeff": feedforward_coeff + } + }) + elif "set_params" in msg_js: + if type(msg_js["set_params"]) is dict: + set_params = msg_js["set_params"] + if "name" in set_params and "value" in set_params: + if type(set_params["value"]) is float: + if set_params["name"] == "target_lower": + target_lower = set_params["value"] + elif set_params["name"] == "target_upper": + target_upper = set_params["value"] + elif set_params["name"] == "feedforward_coeff": + feedforward_coeff = set_params["value"] + except json.JSONDecodeError as e: + print("received bad json ", msg) +stdout_thread = threading.Thread(target=stdout_loop) +stdout_thread.start() + + humidifier_history = np.zeros(30) first_sample = False frame_num = 0 diff --git a/shroom_internals/tcp_server.go b/shroom_internals/tcp_server.go index 0e8dbbf..c5873a5 100644 --- a/shroom_internals/tcp_server.go +++ b/shroom_internals/tcp_server.go @@ -3,9 +3,11 @@ package shroom_internals import ( "database/sql" "encoding/json" + "fmt" "log" "net" "sync" + "time" ) func newlinePos(s []byte) int { @@ -29,15 +31,10 @@ type DataJson struct { HumidifierVolts *float32 `json:"hv"` } -type ParamJson struct { - Name string `json:"name"` - Value float32 `json:"string"` -} - type ShroomPacket struct { - Data *DataJson `json:"data"` - Status *StatusJson `json:"status"` - Param *ParamJson `json:"param"` + Data *DataJson `json:"data"` + Status *StatusJson `json:"status"` + Params map[string]float32 `json:"params"` } type ShroomState struct { @@ -48,6 +45,7 @@ type ShroomState struct { Params map[string]float32 Wait chan struct{} StatusWait chan struct{} + ParamsWait chan struct{} Commands chan []byte } @@ -57,6 +55,7 @@ func NewShroomState() ShroomState { Params: make(map[string]float32), Wait: make(chan struct{}), StatusWait: make(chan struct{}), + ParamsWait: make(chan struct{}), Commands: make(chan []byte), } } @@ -75,6 +74,72 @@ func (s *ShroomState) StatusUpdate() { s.StatusWait = make(chan struct{}) } +func (s *ShroomState) ParamsUpdate() { + s.Lock() + defer s.Unlock() + close(s.ParamsWait) + s.ParamsWait = make(chan struct{}) +} + +func WaitForClose(c chan struct{}) { + stillopen := true + for stillopen { + _, stillopen = <-c + } +} + +func WaitForCloseWithTimeout(c chan struct{}, t time.Duration) bool { + stillopen := true + timeout := make(chan struct{}) + go func() { + time.Sleep(t) + timeout <- struct{}{} + }() + + for stillopen { + select { + case _, stillopen = <-c: + if stillopen { + continue + } + case <-timeout: + return false + } + } + return true +} + +type queryParams struct { + QueryParams bool `json:"query_params"` +} + +func QueryParams(state *ShroomState) ([]byte, error) { + msg, err := json.Marshal(queryParams{ + QueryParams: true, + }) + if err != nil { + err = fmt.Errorf("unable to marshal query request: %w", err) + return nil, err + } + select { + case state.Commands <- msg: + if !WaitForCloseWithTimeout(state.ParamsWait, 2*time.Second) { + return nil, fmt.Errorf("request timed out") + } + state.RLock() + resp, err := json.Marshal(state.Params) + state.RUnlock() + if err != nil { + err = fmt.Errorf("unable to unmarshal params as json: %w", err) + return nil, err + } + return resp, nil + default: + return nil, fmt.Errorf("unable to forward request; controller may not be connected") + } + +} + func parseMsg(line []byte, db *sql.DB, state *ShroomState) { packet := ShroomPacket{} err := json.Unmarshal(line, &packet) @@ -103,6 +168,11 @@ func parseMsg(line []byte, db *sql.DB, state *ShroomState) { } state.Unlock() state.StatusUpdate() + } else if packet.Params != nil { + state.Lock() + state.Params = packet.Params + state.Unlock() + state.ParamsUpdate() } else { log.Println("unknown packet: ", line, string(line)) } @@ -178,10 +248,7 @@ func InitTcpServer(db *sql.DB, state *ShroomState) { } num_read, err := conn.Read(left) - //log.Println("received: ", string(left[:num_read])) left = left[num_read:] - //log.Println("buf ", buf) - //log.Println("left ", left) if err != nil { log.Println("tcp read error: ", err) @@ -197,8 +264,6 @@ func InitTcpServer(db *sql.DB, state *ShroomState) { line := unread[:end] unread = unread[end+1:] - //log.Println("line ", line) - //log.Println("unread ", unread) // skip empty lines if len(line) == 0 { continue diff --git a/shroom_server.go b/shroom_server.go index d5a4ae4..f565cda 100644 --- a/shroom_server.go +++ b/shroom_server.go @@ -161,18 +161,22 @@ func main() { } } - updateHandler := func(w http.ResponseWriter, req *http.Request) { - stillopen := true - for stillopen { - _, stillopen = <-state.Wait + paramsHandler := func(w http.ResponseWriter, req *http.Request) { + msg, err := s.QueryParams(&state) + if err != nil { + w.WriteHeader(500) + w.Write([]byte(err.Error())) + return } + w.Write(msg) + } + + updateHandler := func(w http.ResponseWriter, req *http.Request) { + s.WaitForClose(state.Wait) w.Write([]byte("ok")) } statusUpdateHandler := func(w http.ResponseWriter, req *http.Request) { - stillopen := true - for stillopen { - _, stillopen = <-state.StatusWait - } + s.WaitForClose(state.StatusWait) w.Write([]byte("ok")) } @@ -186,6 +190,7 @@ func main() { http.HandleFunc("/api/latest", lastPoint) http.HandleFunc("/api/status", getStatus) http.HandleFunc("/api/admin", adminHandler) + http.HandleFunc("/api/params", paramsHandler) http.HandleFunc("/api/update", updateHandler) http.HandleFunc("/api/status_update", statusUpdateHandler)