From a65dcb1ee41293bcdaa67d510a2a6e2215d65f94 Mon Sep 17 00:00:00 2001 From: "Kelvin Ly (from lePotato)" Date: Wed, 21 Aug 2024 11:08:57 -0400 Subject: [PATCH] Make the piping logic more robust and separate it further from the main process to avoid stalling the controller logic; add in staggered restarts --- shroom_controller.py | 53 ++++++++++++++++++++++++++++++++++---------- utils.py | 34 ++++++++++++++++++++++++---- 2 files changed, 71 insertions(+), 16 deletions(-) diff --git a/shroom_controller.py b/shroom_controller.py index 7226d0e..6c870ab 100644 --- a/shroom_controller.py +++ b/shroom_controller.py @@ -113,12 +113,50 @@ def stdout_loop(): stdout_thread = threading.Thread(target=stdout_loop) stdout_thread.start() +def restart_pipe(): + global exiting, stdout_thread + exiting = True + utils.process.kill() + time.sleep(0.1) + stdout_thread.join() + exiting = False + + start_process() + + stdout_thread = threading.Thread(target=stdout_loop) + stdout_thread.start() + + frame_num = 0 last_sample = 0 + +last_pipe_reboot = 0 +last_running = False +pipe_timeout = 10 + try: while True: now = time.time() + + # check to see if the SSH pipe is working + if utils.running(): + if not last_running and utils.running(): + print("pipe is now running, resetting timeout", pipe_timeout) + pipe_timeout = 10 + last_running = utils.running() + + if not utils.running() and (now - last_pipe_reboot) > pipe_timeout: + try: + restart_pipe() + except Exception as e: + print("error restarting pipe: {}".format(repr(e))) + last_pipe_reboot = now + pipe_timeout *= 1.5 + pipe_timeout = min(pipe_timeout, 5*60) + print("new pipe timeout ", pipe_timeout) + + now = time.time() if now - last_sample < SAMPLE_PERIOD: time.sleep(SAMPLE_PERIOD - (now - last_sample) + 0.001) continue @@ -145,7 +183,7 @@ try: controller.update(s, humidity) if frame_num == 0: - #print(humidity, temp, volts) + print(humidity, temp, volts) update = { "data": { "time": int(now*1000), @@ -159,18 +197,9 @@ try: #print("sending update {}".format(update)) frame_num = (frame_num + 1) % DECIMATION_RATE except Exception as e: - print("pipe errored out, restarting: ", e) + print("pipe errored out, restarting: ", repr(e)) # restart the process I guess - exiting = True - utils.process.kill() - time.sleep(0.1) - stdout_thread.join() - exiting = False - - start_process() - - stdout_thread = threading.Thread(target=stdout_loop) - stdout_thread.start() + restart_pipe() finally: # kill ssh connection diff --git a/utils.py b/utils.py index 8ef289b..25abdee 100644 --- a/utils.py +++ b/utils.py @@ -1,10 +1,28 @@ import subprocess +import threading + import os import json +import queue process = None +update_thread = None +update_queue = queue.LifoQueue(maxsize=1) + +def update_loop(p, q): + print("update_loop start") + while p.poll() is None: + msg = q.get() + #print("got msg") + p.stdin.write(bytes(json.dumps(msg) + "\n", "utf8")) + p.stdin.flush() + def start_process(): - global process + print("starting shroom pipe") + global process, update_thread + if process is not None and process.poll() is None: + print("shroom pipe is still running, exiting init") + return try: is_mock = os.environ['MOCK'] @@ -16,9 +34,17 @@ def start_process(): #process = subprocess.Popen(["ssh", "shrooms@threefortiethofonehamster.com", "/usr/bin/env", "python3", "/home/shrooms/shrooms-server/shroom_pipe.py"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) process = subprocess.Popen(["ssh", "shrooms@35.211.7.97", "/usr/bin/env", "python3", "/home/shrooms/shrooms-server/shroom_pipe.py"], stdin=subprocess.PIPE, stdout=subprocess.PIPE) -def send_update(msg): + update_thread = threading.Thread(target=update_loop, args = (process, update_queue)) + update_thread.start() + +def running(): global process - process.stdin.write(bytes(json.dumps(msg) + "\n", "utf8")) - process.stdin.flush() + return process is not None and process.poll() is None +def send_update(msg): + global update_queue + try: + update_queue.put_nowait(msg) + except queue.Full: + print("queue full, skipping message")