commit 9c5f646e7c1f1093215d0b39cec8bab6663f5df8
parent d18a25d831d8bb8047ebe47f2c0219c09533546a
Author: Brian C. Lane <bcl@brianlane.com>
Date: Fri, 24 Jun 2022 06:29:44 -0700
Add updating event cache with new events
The queue and API live in separate processes, so use a Pipe to pass the
event paths to the API process so that it can update the EventCache with
new event details.
Diffstat:
3 files changed, 19 insertions(+), 9 deletions(-)
diff --git a/src/strix/__init__.py b/src/strix/__init__.py
@@ -140,9 +140,10 @@ def run():
print("ERROR: %s does not exist. Is motion running?" % queue_path)
return False
queue_quit = mp.Event()
+ queue_rx, queue_tx = mp.Pipe(False)
queue_thread = mp.Process(name="queue-thread",
target=queue.monitor_queue,
- args=(logger_queue, base_dir, queue_quit, opts.max_cores))
+ args=(logger_queue, base_dir, queue_quit, opts.max_cores, queue_tx))
queue_thread.start()
running_threads += [(queue_thread, queue_quit)]
@@ -150,7 +151,7 @@ def run():
api_quit = mp.Event()
api_thread = mp.Process(name="api-thread",
target=api.run_api,
- args=(logger_queue, base_dir, cameras, opts.host, opts.port, opts.debug))
+ args=(logger_queue, base_dir, cameras, opts.host, opts.port, opts.debug, queue_rx))
api_thread.start()
running_threads += [(api_thread, api_quit)]
diff --git a/src/strix/api.py b/src/strix/api.py
@@ -16,7 +16,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from gevent import monkey; monkey.patch_all()
from datetime import datetime
-import multiprocessing as mp
import os
# Fix mimetypes so that it recognized m4v as video/mp4
@@ -25,19 +24,24 @@ mimetypes.add_type("video/mp4", ".m4v")
from bottle import install, route, run, static_file, request, Response, JSONPlugin
from json import dumps
+from threading import Thread
from . import logger
-from .events import camera_events, EventCache
+from .events import camera_events, EventCache, queue_events
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def timestr_to_dt(rfc_str):
return datetime.strptime(rfc_str, TIME_FORMAT)
-def run_api(logging_queue, base_dir, cameras, host, port, debug):
+def run_api(logging_queue, base_dir, cameras, host, port, debug, queue_rx):
log = logger.log(logging_queue)
log.info("Starting API", base_dir=base_dir, cameras=cameras, host=host, port=port, debug=debug)
EventCache.logger(log)
+ # Listen to queue_rx for new events
+ th = Thread(target=queue_events, args=(log, queue_rx))
+ th.start()
+
@route('/')
@route('/<filename>')
def serve_root(filename="index.html"):
@@ -76,3 +80,5 @@ def run_api(logging_queue, base_dir, cameras, host, port, debug):
# Use str as default in json dumps for objects like datetime
install(JSONPlugin(json_dumps=lambda s: dumps(s, default=str)))
run(host=host, port=port, debug=debug, server="gevent")
+
+ th.join(30)
diff --git a/src/strix/queue.py b/src/strix/queue.py
@@ -95,7 +95,7 @@ def BestThumbnail(path):
## Handle watching the queue and dispatching movie creation and directory moving
-def process_event(log: structlog.BoundLogger, base_dir: str, event: str) -> None:
+def process_event(log: structlog.BoundLogger, base_dir: str, event: str, queue_tx) -> None:
log.info(event_path=event, base_dir=base_dir)
# The actual path is the event with _ replaced by /
@@ -151,13 +151,16 @@ def process_event(log: structlog.BoundLogger, base_dir: str, event: str) -> None
first_time = first_jpg.rsplit("-", 1)[0]
event_path_base = os.path.split(event_path)[0]
dest_path = os.path.join(event_path_base, first_time)
- log.info("Moved event to final location", dest_path=dest_path)
if not os.path.exists(dest_path):
os.rename(event_path, dest_path)
+ log.info("Moved event to final location", dest_path=dest_path)
+
+ # Tell the event thread/process about the new path
+ queue_tx.send(dest_path)
except Exception as e:
log.error("Moving to destination failed", event_path=event_path, exception=str(e))
-def monitor_queue(logging_queue, base_dir, quit, max_threads):
+def monitor_queue(logging_queue, base_dir, quit, max_threads, queue_tx):
threads = []
log = logger.log(logging_queue)
@@ -177,7 +180,7 @@ def monitor_queue(logging_queue, base_dir, quit, max_threads):
os.unlink(event_file)
event = os.path.split(event_file)[-1]
- thread = mp.Process(target=process_event, args=(log, base_dir, event))
+ thread = mp.Process(target=process_event, args=(log, base_dir, event, queue_tx))
threads.append(thread)
thread.start()