Make the piping logic more robust and separate it further from the main process to avoid stalling the controller logic; add in staggered restarts

This commit is contained in:
Kelvin Ly (from lePotato) 2024-08-21 11:08:57 -04:00
parent 3117487346
commit a65dcb1ee4
2 changed files with 71 additions and 16 deletions

View File

@ -113,12 +113,50 @@ def stdout_loop():
stdout_thread = threading.Thread(target=stdout_loop)
stdout_thread.start()
def restart_pipe():
global exiting, stdout_thread
exiting = True
utils.process.kill()
time.sleep(0.1)
stdout_thread.join()
exiting = False
start_process()
stdout_thread = threading.Thread(target=stdout_loop)
stdout_thread.start()
frame_num = 0
last_sample = 0
last_pipe_reboot = 0
last_running = False
pipe_timeout = 10
try:
while True:
now = time.time()
# check to see if the SSH pipe is working
if utils.running():
if not last_running and utils.running():
print("pipe is now running, resetting timeout", pipe_timeout)
pipe_timeout = 10
last_running = utils.running()
if not utils.running() and (now - last_pipe_reboot) > pipe_timeout:
try:
restart_pipe()
except Exception as e:
print("error restarting pipe: {}".format(repr(e)))
last_pipe_reboot = now
pipe_timeout *= 1.5
pipe_timeout = min(pipe_timeout, 5*60)
print("new pipe timeout ", pipe_timeout)
now = time.time()
if now - last_sample < SAMPLE_PERIOD:
time.sleep(SAMPLE_PERIOD - (now - last_sample) + 0.001)
continue
@ -145,7 +183,7 @@ try:
controller.update(s, humidity)
if frame_num == 0:
#print(humidity, temp, volts)
print(humidity, temp, volts)
update = {
"data": {
"time": int(now*1000),
@ -159,18 +197,9 @@ try:
#print("sending update {}".format(update))
frame_num = (frame_num + 1) % DECIMATION_RATE
except Exception as e:
print("pipe errored out, restarting: ", e)
print("pipe errored out, restarting: ", repr(e))
# restart the process I guess
exiting = True
utils.process.kill()
time.sleep(0.1)
stdout_thread.join()
exiting = False
start_process()
stdout_thread = threading.Thread(target=stdout_loop)
stdout_thread.start()
restart_pipe()
finally:
# kill ssh connection

View File

@ -1,10 +1,28 @@
import subprocess
import threading
import os
import json
import queue
process = None
update_thread = None
update_queue = queue.LifoQueue(maxsize=1)
def update_loop(p, q):
print("update_loop start")
while p.poll() is None:
msg = q.get()
#print("got msg")
p.stdin.write(bytes(json.dumps(msg) + "\n", "utf8"))
p.stdin.flush()
def start_process():
global process
print("starting shroom pipe")
global process, update_thread
if process is not None and process.poll() is None:
print("shroom pipe is still running, exiting init")
return
try:
is_mock = os.environ['MOCK']
@ -16,9 +34,17 @@ def start_process():
#process = subprocess.Popen(["ssh", "shrooms@threefortiethofonehamster.com", "/usr/bin/env", "python3", "/home/shrooms/shrooms-server/shroom_pipe.py"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
process = subprocess.Popen(["ssh", "shrooms@35.211.7.97", "/usr/bin/env", "python3", "/home/shrooms/shrooms-server/shroom_pipe.py"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def send_update(msg):
update_thread = threading.Thread(target=update_loop, args = (process, update_queue))
update_thread.start()
def running():
global process
process.stdin.write(bytes(json.dumps(msg) + "\n", "utf8"))
process.stdin.flush()
return process is not None and process.poll() is None
def send_update(msg):
global update_queue
try:
update_queue.put_nowait(msg)
except queue.Full:
print("queue full, skipping message")