Add support for parameter querying and modification

This commit is contained in:
Kelvin Ly 2023-05-16 11:35:06 -04:00
parent 0934e2df8d
commit e5e4289342
3 changed files with 127 additions and 30 deletions

View File

@ -24,15 +24,6 @@ def send_update(msg):
process.stdin.write(bytes(json.dumps(msg) + "\n", "utf8")) process.stdin.write(bytes(json.dumps(msg) + "\n", "utf8"))
process.stdin.flush() process.stdin.flush()
exiting = False
# run thread to process data from process's stdout
def stdout_loop():
while not exiting:
msg = process.stdout.readline()
print("got message ", msg)
stdout_thread = threading.Thread(target=stdout_loop)
stdout_thread.start()
class MockSerial: class MockSerial:
def __init__(self): def __init__(self):
self.humidity = np.zeros(100) self.humidity = np.zeros(100)
@ -118,6 +109,42 @@ target_lower = 0.85
target_upper = 0.90 target_upper = 0.90
feedforward_coeff = 50 feedforward_coeff = 50
exiting = False
# run thread to process data from process's stdout
def stdout_loop():
global process, target_lower, target_upper, feedforward_coeff
while not exiting:
msg = process.stdout.readline()
if len(msg) == 0:
continue
print("got message ", msg)
try:
msg_js = json.loads(msg)
if "query_params" in msg_js:
if msg_js["query_params"]:
send_update({"params": {
"target_lower": target_lower,
"target_upper": target_upper,
"feedforward_coeff": feedforward_coeff
}
})
elif "set_params" in msg_js:
if type(msg_js["set_params"]) is dict:
set_params = msg_js["set_params"]
if "name" in set_params and "value" in set_params:
if type(set_params["value"]) is float:
if set_params["name"] == "target_lower":
target_lower = set_params["value"]
elif set_params["name"] == "target_upper":
target_upper = set_params["value"]
elif set_params["name"] == "feedforward_coeff":
feedforward_coeff = set_params["value"]
except json.JSONDecodeError as e:
print("received bad json ", msg)
stdout_thread = threading.Thread(target=stdout_loop)
stdout_thread.start()
humidifier_history = np.zeros(30) humidifier_history = np.zeros(30)
first_sample = False first_sample = False
frame_num = 0 frame_num = 0

View File

@ -3,9 +3,11 @@ package shroom_internals
import ( import (
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt"
"log" "log"
"net" "net"
"sync" "sync"
"time"
) )
func newlinePos(s []byte) int { func newlinePos(s []byte) int {
@ -29,15 +31,10 @@ type DataJson struct {
HumidifierVolts *float32 `json:"hv"` HumidifierVolts *float32 `json:"hv"`
} }
type ParamJson struct {
Name string `json:"name"`
Value float32 `json:"string"`
}
type ShroomPacket struct { type ShroomPacket struct {
Data *DataJson `json:"data"` Data *DataJson `json:"data"`
Status *StatusJson `json:"status"` Status *StatusJson `json:"status"`
Param *ParamJson `json:"param"` Params map[string]float32 `json:"params"`
} }
type ShroomState struct { type ShroomState struct {
@ -48,6 +45,7 @@ type ShroomState struct {
Params map[string]float32 Params map[string]float32
Wait chan struct{} Wait chan struct{}
StatusWait chan struct{} StatusWait chan struct{}
ParamsWait chan struct{}
Commands chan []byte Commands chan []byte
} }
@ -57,6 +55,7 @@ func NewShroomState() ShroomState {
Params: make(map[string]float32), Params: make(map[string]float32),
Wait: make(chan struct{}), Wait: make(chan struct{}),
StatusWait: make(chan struct{}), StatusWait: make(chan struct{}),
ParamsWait: make(chan struct{}),
Commands: make(chan []byte), Commands: make(chan []byte),
} }
} }
@ -75,6 +74,72 @@ func (s *ShroomState) StatusUpdate() {
s.StatusWait = make(chan struct{}) s.StatusWait = make(chan struct{})
} }
func (s *ShroomState) ParamsUpdate() {
s.Lock()
defer s.Unlock()
close(s.ParamsWait)
s.ParamsWait = make(chan struct{})
}
func WaitForClose(c chan struct{}) {
stillopen := true
for stillopen {
_, stillopen = <-c
}
}
func WaitForCloseWithTimeout(c chan struct{}, t time.Duration) bool {
stillopen := true
timeout := make(chan struct{})
go func() {
time.Sleep(t)
timeout <- struct{}{}
}()
for stillopen {
select {
case _, stillopen = <-c:
if stillopen {
continue
}
case <-timeout:
return false
}
}
return true
}
type queryParams struct {
QueryParams bool `json:"query_params"`
}
func QueryParams(state *ShroomState) ([]byte, error) {
msg, err := json.Marshal(queryParams{
QueryParams: true,
})
if err != nil {
err = fmt.Errorf("unable to marshal query request: %w", err)
return nil, err
}
select {
case state.Commands <- msg:
if !WaitForCloseWithTimeout(state.ParamsWait, 2*time.Second) {
return nil, fmt.Errorf("request timed out")
}
state.RLock()
resp, err := json.Marshal(state.Params)
state.RUnlock()
if err != nil {
err = fmt.Errorf("unable to unmarshal params as json: %w", err)
return nil, err
}
return resp, nil
default:
return nil, fmt.Errorf("unable to forward request; controller may not be connected")
}
}
func parseMsg(line []byte, db *sql.DB, state *ShroomState) { func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
packet := ShroomPacket{} packet := ShroomPacket{}
err := json.Unmarshal(line, &packet) err := json.Unmarshal(line, &packet)
@ -103,6 +168,11 @@ func parseMsg(line []byte, db *sql.DB, state *ShroomState) {
} }
state.Unlock() state.Unlock()
state.StatusUpdate() state.StatusUpdate()
} else if packet.Params != nil {
state.Lock()
state.Params = packet.Params
state.Unlock()
state.ParamsUpdate()
} else { } else {
log.Println("unknown packet: ", line, string(line)) log.Println("unknown packet: ", line, string(line))
} }
@ -178,10 +248,7 @@ func InitTcpServer(db *sql.DB, state *ShroomState) {
} }
num_read, err := conn.Read(left) num_read, err := conn.Read(left)
//log.Println("received: ", string(left[:num_read]))
left = left[num_read:] left = left[num_read:]
//log.Println("buf ", buf)
//log.Println("left ", left)
if err != nil { if err != nil {
log.Println("tcp read error: ", err) log.Println("tcp read error: ", err)
@ -197,8 +264,6 @@ func InitTcpServer(db *sql.DB, state *ShroomState) {
line := unread[:end] line := unread[:end]
unread = unread[end+1:] unread = unread[end+1:]
//log.Println("line ", line)
//log.Println("unread ", unread)
// skip empty lines // skip empty lines
if len(line) == 0 { if len(line) == 0 {
continue continue

View File

@ -161,18 +161,22 @@ func main() {
} }
} }
updateHandler := func(w http.ResponseWriter, req *http.Request) { paramsHandler := func(w http.ResponseWriter, req *http.Request) {
stillopen := true msg, err := s.QueryParams(&state)
for stillopen { if err != nil {
_, stillopen = <-state.Wait w.WriteHeader(500)
w.Write([]byte(err.Error()))
return
} }
w.Write(msg)
}
updateHandler := func(w http.ResponseWriter, req *http.Request) {
s.WaitForClose(state.Wait)
w.Write([]byte("ok")) w.Write([]byte("ok"))
} }
statusUpdateHandler := func(w http.ResponseWriter, req *http.Request) { statusUpdateHandler := func(w http.ResponseWriter, req *http.Request) {
stillopen := true s.WaitForClose(state.StatusWait)
for stillopen {
_, stillopen = <-state.StatusWait
}
w.Write([]byte("ok")) w.Write([]byte("ok"))
} }
@ -186,6 +190,7 @@ func main() {
http.HandleFunc("/api/latest", lastPoint) http.HandleFunc("/api/latest", lastPoint)
http.HandleFunc("/api/status", getStatus) http.HandleFunc("/api/status", getStatus)
http.HandleFunc("/api/admin", adminHandler) http.HandleFunc("/api/admin", adminHandler)
http.HandleFunc("/api/params", paramsHandler)
http.HandleFunc("/api/update", updateHandler) http.HandleFunc("/api/update", updateHandler)
http.HandleFunc("/api/status_update", statusUpdateHandler) http.HandleFunc("/api/status_update", statusUpdateHandler)