commit 7ddf4463c7b0fce7f26617249ece9e62668f3f78
parent 447473001c1eadb0259a89d8c2e110082fa1b053
Author: Brian C. Lane <bcl@brianlane.com>
Date: Sun, 24 Sep 2017 11:43:06 -0700
Use multiprocessing and gevent
Switch the main threading to just use multiprocessing for all of it
(KISS), and add gevent to the bottle server (which is what really fixed
the crappy responses from the API).
Ends up the default wsgi can only do 1 thing at a time.
Diffstat:
6 files changed, 22 insertions(+), 24 deletions(-)
diff --git a/requirements.txt b/requirements.txt
@@ -1,4 +1,5 @@
bottle
+gevent
mypy
Pillow
pytest
diff --git a/src/strix/__init__.py b/src/strix/__init__.py
@@ -18,7 +18,6 @@ import multiprocessing as mp
import os
import re
import time
-import threading
from . import api
from . import cmdline
@@ -104,10 +103,10 @@ def run() -> bool:
# Start logging thread
logging_queue = mp.Queue() # type: mp.Queue[List[Any]]
- logging_quit = threading.Event()
- logging_thread = threading.Thread(name="logging-thread",
- target=logger.listener,
- args=(logging_queue, logging_quit, opts.log))
+ logging_quit = mp.Event() # type: mp.Event
+ logging_thread = mp.Process(name="logging-thread",
+ target=logger.listener,
+ args=(logging_queue, logging_quit, opts.log))
logging_thread.start()
running_threads = [(logging_thread, logging_quit)]
@@ -116,18 +115,18 @@ def run() -> bool:
if not os.path.exists(queue_path):
print("ERROR: %s does not exist. Is motion running?" % queue_path)
return False
- queue_quit = threading.Event()
- queue_thread = threading.Thread(name="queue-thread",
- target=queue.monitor_queue,
- args=(logging_queue, base_dir, queue_quit))
+ queue_quit = mp.Event()
+ queue_thread = mp.Process(name="queue-thread",
+ target=queue.monitor_queue,
+ args=(logging_queue, base_dir, queue_quit))
queue_thread.start()
running_threads += [(queue_thread, queue_quit)]
# Start API thread (may start its own threads to handle requests)
- api_quit = threading.Event()
- api_thread = threading.Thread(name="api-thread",
- target=api.run_api,
- args=(logging_queue, base_dir, opts.host, opts.port, opts.debug))
+ api_quit = mp.Event()
+ api_thread = mp.Process(name="api-thread",
+ target=api.run_api,
+ args=(logging_queue, base_dir, opts.host, opts.port, opts.debug))
api_thread.start()
running_threads += [(api_thread, api_quit)]
diff --git a/src/strix/api.py b/src/strix/api.py
@@ -14,10 +14,10 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+from gevent import monkey; monkey.patch_all()
from datetime import datetime
import multiprocessing as mp
import os
-import threading
from bottle import route, run, static_file, request, Response
@@ -48,17 +48,17 @@ def run_api(logging_queue: mp.Queue, base_dir: str, host: str, port: int, debug:
offset= int(request.query.get("offset", "0"))
limit = int(request.query.get("limit", "10"))
camera_list = cameras.split(",")
- log.debug("serve_events", camera_list=camera_list, start=str(start), end=str(end), offset=offset, limit=limit)
+# log.debug("serve_events", camera_list=camera_list, start=str(start), end=str(end), offset=offset, limit=limit)
events = {}
for camera in camera_list:
events[camera] = camera_events(log, base_dir, camera, start, end, offset, limit)
- log.debug("serve_events", events=events)
+# log.debug("serve_events", events=events)
return {"start": str(start),
"end": str(end),
"offset": offset,
"limit": limit,
"events": events}
- run(host=host, port=port, debug=debug)
+ run(host=host, port=port, debug=debug, server="gevent")
diff --git a/src/strix/events.py b/src/strix/events.py
@@ -41,7 +41,7 @@ def image_to_dt(event_date: str, image: str) -> datetime:
def event_details(log: structlog.BoundLogger, event_path: str) -> Dict:
- log.info("event_details", path=event_path)
+# log.info("event_details", path=event_path)
(camera_name, event_date, event_time) = event_path.rsplit("/", 3)[-3:]
# Grab the camera, date, and time and build the URL path
@@ -85,8 +85,8 @@ def event_details(log: structlog.BoundLogger, event_path: str) -> Dict:
is_saved = os.path.exists(event_path+"/.saved")
- log.debug("event_details", thumbnail=thumbnail, start_time=str(start_time), end_time=str(end_time),
- video=video, debug_video=debug_video, saved=is_saved)
+# log.debug("event_details", thumbnail=thumbnail, start_time=str(start_time), end_time=str(end_time),
+# video=video, debug_video=debug_video, saved=is_saved)
return {
"start": str(start_time),
diff --git a/src/strix/logger.py b/src/strix/logger.py
@@ -18,7 +18,6 @@ import sys
import logging
from logging.handlers import RotatingFileHandler, QueueListener, QueueHandler
import multiprocessing as mp
-import threading
import structlog
@@ -41,7 +40,7 @@ structlog.configure(
cache_logger_on_first_use=True,
)
-def listener(queue: mp.Queue, stop_event: threading.Event, log_path: str) -> None:
+def listener(queue: mp.Queue, stop_event: mp.Event, log_path: str) -> None:
handler = RotatingFileHandler(log_path, maxBytes=100*1024**2, backupCount=10)
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
diff --git a/src/strix/queue.py b/src/strix/queue.py
@@ -20,7 +20,6 @@ import os
import shutil
import subprocess
import time
-import threading
from PIL import Image
import structlog
@@ -96,7 +95,7 @@ def process_event(log: structlog.BoundLogger, base_dir: str, event: str) -> None
except Exception as e:
log.error("Moving to destination failed", event_path=event_path, exception=str(e))
-def monitor_queue(logging_queue: mp.Queue, base_dir: str, quit: threading.Event) -> None:
+def monitor_queue(logging_queue: mp.Queue, base_dir: str, quit: mp.Event) -> None:
threads = [] # type: List[mp.Process]
log = logger.log(logging_queue)