commit da8ba9bbfc15096709b26dc0be36231fde4ac63a
parent 7fc7720428940a12d74ae1570bfbd007614e7c56
Author: Brian C. Lane <bcl@brianlane.com>
Date: Sun, 20 Mar 2022 10:04:36 -0700
events: Add expire support to the cache
The cache is kept in memory and on disk. When _check_cache minutes have
elapsed it will check for old data (older than _keep_days) and remove it
from memory and from disk.
Filesystem deletes can be quite expensive so the directory is renamed to
a temporary location and then deleted in a background thread.
Diffstat:
M | src/strix/events.py | | | 169 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- |
1 file changed, 159 insertions(+), 10 deletions(-)
diff --git a/src/strix/events.py b/src/strix/events.py
@@ -14,10 +14,14 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from datetime import datetime
+from datetime import datetime, timedelta
from glob import glob
import json
+import multiprocessing as mp
import os
+import re
+import shutil
+import tempfile
import threading
import structlog
@@ -26,6 +30,12 @@ from typing import Dict, List
class EventCacheClass:
def __init__(self):
+ self._log = None
+ self._base_dir = "/invalid/path/for/expire"
+ self._last_check = datetime(1985, 10, 26, 1, 22, 0)
+ self._force_expire = False
+ self._check_cache = 60
+ self._keep_days = 9999
self._lock = threading.Lock()
self._cache = {}
@@ -34,13 +44,139 @@ class EventCacheClass:
return self._cache[key]
def set(self, key, value):
+ # DEBUG
+ if self._log:
+ self._log.info(f"EventCache: {key} = {value}")
+
with self._lock:
+ # Convert start/end to datetime object
+ if "start" in value and type(value["start"]) == type(""):
+ value["start"] = datetime.fromisoformat(value["start"])
+ if "end" in value and type(value["end"]) == type(""):
+ value["end"] = datetime.fromisoformat(value["end"])
+
self._cache[key] = value
+ # This can potentially remove the key just added if it is an old event
+ self._expire_events()
+
+ # Return True if it was added to the cache
+ return key in self._cache
+
+ def base_dir(self, base_dir):
+ with self._lock:
+ self._base_dir = base_dir
+
+ def logger(self, logger):
+ with self._lock:
+ self._log = logger
+
+ def keep(self, days):
+ with self._lock:
+ self._keep_days = days
+
+ def check_cache(self, minutes):
+ with self._lock:
+ self._check_cache = minutes
+
+ def reset_check(self):
+ with self._lock:
+ self._last_check = datetime(1985, 10, 26, 1, 22, 0)
+
+ def force_expire(self, force=False):
+ with self._lock:
+ self._force_expire = force
+
+ def _expire_events(self):
+ start = datetime.now()
+
+ if not self._force_expire:
+ if start - self._last_check < timedelta(minutes=self._check_cache):
+ return
+ self._last_check = datetime.now()
+
+ if self._log and not self._force_expire:
+ self._log.info("Checking cache...")
+
+ remove = {}
+ for e in self._cache:
+ if self._cache[e]["start"] < datetime.now() - timedelta(days=self._keep_days):
+ if "event_path" in self._cache[e] \
+ and self._cache[e]["event_path"].startswith(self._base_dir):
+ daypath = os.path.dirname(self._cache[e]["event_path"].rstrip("/"))
+ if daypath in remove:
+ remove[daypath].append(e)
+ else:
+ remove[daypath] = [e]
+
+ if self._log and not self._force_expire:
+ self._log.info(f"Done checking cache in {datetime.now()-start}")
+
+ remove = {}
+ if len(remove) == 0:
+ return
+
+ if self._log:
+ self._log.info(f"Removing {len(remove)} days")
+
+ # Create the temporary delete_queue directory
+ tdir = tempfile.TemporaryDirectory(dir=os.path.join(self._base_dir, "delete_queue"))
+ delete_queue = tdir.name
+
+ # Move each day's directory to the temporary delete_queue directory
+ for daypath in remove:
+ # All paths should have a Camera* component
+ cm = re.search("(Camera\d+)", daypath)
+ if not cm:
+ if self._log:
+ self._log.error(f"Camera* missing from path {daypath}")
+
+ if cm and os.path.exists(daypath):
+ if self._log:
+ self._log.info("REMOVE: %s", daypath)
+
+ if not os.path.exists(os.path.join(delete_queue, cm.group())):
+ os.makedirs(os.path.join(delete_queue, cm.group()))
+
+ # Move the daily directory tree into the delete_queue/Camera* directory
+ shutil.move(daypath, os.path.join(delete_queue, cm.group()))
+
+ # Remove the events from the cache
+ self._log.info(f"Removing {len(remove[daypath])} events")
+ for e in remove[daypath]:
+ del self._cache[e]
+
+ if self._log:
+ self._log.info(f"Expire of {len(remove)} days took: {datetime.now()-start}")
+
+ def dth_fn(delete_queue):
+ shutil.rmtree(delete_queue, ignore_errors=True)
+
+ # Start a thread to do the actual delete in the background
+ dth = mp.Process(name="delete-thread",
+ target=dth_fn,
+ args=(delete_queue,))
+ dth.start()
+
+
# Singleton
EventCache = EventCacheClass()
+def preload_cache(log, base_dir):
+ log.info("Pre-loading event cache...")
+ start = datetime(1985, 10, 26, 1, 22, 0)
+ total = timedelta()
+ EventCache.force_expire(True)
+ for camera in sorted(c for c in os.listdir(base_dir) if c.startswith("Camera")):
+ end = datetime.now()
+ _ = camera_events(log, base_dir, camera, start, end, 0, 0)
+ log.info(f"{camera} event cache loaded in {datetime.now()-end} seconds")
+ total += datetime.now()-end
+ log.info(f"Event cache loaded in {total} seconds")
+ EventCache.force_expire(False)
+
+
def path_to_dt(path: str) -> datetime:
# Use the last 2 elements of the path to construct a Datatime
(date, time) = path.split("/")[-2:]
@@ -72,9 +208,15 @@ def event_details(log: structlog.BoundLogger, event_path: str) -> Dict:
if os.path.exists(event_path+"/.details.json"):
with open(event_path+"/.details.json") as f:
details = json.load(f)
- EventCache.set(event_path, details)
+
+ # Adding to the cache can potentially expire old events
+ ok = EventCache.set(event_path, details)
+ if ok:
+ return details
+ else:
+ return None
except json.decoder.JSONDecodeError:
- log.warn("Error reading .details.json from %s", event_path)
+ log.error("Error reading .details.json from %s", event_path)
(camera_name, event_date, event_time) = event_path.rsplit("/", 3)[-3:]
@@ -119,18 +261,23 @@ def event_details(log: structlog.BoundLogger, event_path: str) -> Dict:
# video=video, debug_video=debug_video, saved=is_saved)
details = {
- "start": str(start_time),
- "end": str(end_time),
+ "start": start_time,
+ "end": end_time,
"video": video[0],
"debug_video": video[1],
"thumbnail": thumbnail,
"images": [],
- "saved": is_saved
+ "saved": is_saved,
+ "event_path": event_path,
}
- EventCache.set(event_path, details)
- with open(event_path+"/.details.json", "w") as f:
- json.dump(details, f)
+ # Adding to the cache can potentially expire it if it was an old event
+ ok = EventCache.set(event_path, details)
+ if not ok:
+ return None
+
+ with open(event_path+"/.details.json", "w") as f:
+ json.dump(details, f, default=str)
return details
@@ -151,7 +298,9 @@ def camera_events(log: structlog.BoundLogger, base_dir: str, camera: str,
skipped += 1
continue
- events.insert(0, event_details(log, event_path))
+ details = event_details(log, event_path)
+ if details is not None:
+ events.insert(0, details)
added += 1
if limit > 0 and added >= limit: