Browse Source

move lock into cached page to lock per page instead of entire cache

Daniel Sheffield 1 year ago
parent
commit
bdfaae2ce6
4 changed files with 44 additions and 50 deletions
  1. 12 2
      app/rest/CachedLoadingPage.py
  2. 27 45
      app/rest/pyapi.py
  3. 4 2
      app/rest/trend.py
  4. 1 1
      test/rest/test_CachedLoadingPage.py

+ 12 - 2
app/rest/CachedLoadingPage.py

@@ -6,17 +6,19 @@
 from queue import Queue, Empty
 from time import time
 from threading import Lock
+from typing import Callable
 
 class CachedLoadingPage():
     
     value: str
 
-    def __init__(self, value: str):
+    def __init__(self, initial_value: str, provider: Callable[[Queue], None]):
         self._created = time()
         self._queue = Queue()
         self._loaded = False
-        self.value = value
+        self.value = initial_value
         self._lock = Lock()
+        self.provider = provider
 
     @property
     def age(self) -> float:
@@ -37,11 +39,19 @@ class CachedLoadingPage():
     @property
     def stale(self) -> bool:
         return self.age > 10*60
+    
+    def _start(self) -> None:
+        if not self.provider:
+            return
+        
+        self.provider(self.queue)
+        self.provider = None
 
     def update(self) -> str:
         if not self._lock.acquire(blocking=True, timeout=0.5):
             return self.value
         try:
+            self._start()
             item = self._queue.get(block=True, timeout=0.5)
             if item is None:
                 self._queue.task_done()

+ 27 - 45
app/rest/pyapi.py

@@ -5,27 +5,20 @@
 # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
 from typing import Iterable, Dict
 import os
-from time import time
 from urllib.parse import urlencode
 from bottle import (
     route,
     request,
-    run,
     response,
     FormsDict,
     redirect,
     template,
     static_file,
-    TEMPLATE_PATH,
 )
-import matplotlib
 from psycopg import connect
 from psycopg.sql import SQL, Literal
-import seaborn as sns
 from threading import Lock, Thread
 
-from queue import Queue, Empty
-
 from ..data.filter import(
     get_filter,
     get_query_param,
@@ -38,10 +31,6 @@ from . import trend as worker
 from . import PARAMS
 from .CachedLoadingPage import CachedLoadingPage
 
-matplotlib.use('agg')
-
-CACHE: Dict[str, CachedLoadingPage] = dict()
-
 host = f"host={os.getenv('HOST')}"
 db = f"dbname={os.getenv('DB', 'grocery')}"
 user = f"user={os.getenv('USER', 'das')}"
@@ -49,7 +38,20 @@ password = f"password={os.getenv('PASSWORD','')}"
 if not password.split('=',1)[1]:
     password = ''
 conn = connect(f"{host} {db} {user} {password}")
-sns.set_theme(style='darkgrid', palette='pastel')
+
+CACHE: Dict[str, CachedLoadingPage] = dict()
+
+def enforce_limit(cache, limit):
+    for idx, (_, k) in enumerate(sorted([
+            (v.age, k) for k, v in cache.items()
+        ])):
+        if idx > limit: del cache[k]
+
+
+def clear_stale(cache):
+    for k in [k for k, v in cache.items() if v.stale]:
+        del cache[k]
+
 
 def get_product_rollup_statement(filters, having=None):
     where = [ get_where_include_exclude(
@@ -92,7 +94,6 @@ def normalize_query(query: FormsDict, allow: Iterable[str] = None) -> str:
 def send_static(filename):
     return static_file(filename, root='app/rest/static')
 
-
 global LOCK
 LOCK = Lock()
 
@@ -110,38 +111,19 @@ def trend():
     if request.query_string in CACHE:
         page = CACHE[request.query_string]
         if not page.stale:
-            if not page.loaded:
-                return page.update()
-            return page.value
-        else:
-            del CACHE[request.query_string]
-    
-    if LOCK.acquire(blocking=True):
-        if request.query_string in CACHE:
-            page = CACHE[request.query_string]
-            if not page.stale:
-                LOCK.release()
-                if not page.loaded:
-                    return page.update()
-                return page.value
-        
-        try:
-            page = CachedLoadingPage(template("loading", progress=[]))
-            for k in [k for k, v in CACHE.items() if v.stale]:
-                del CACHE[k]
-            
-            for idx, (_, k) in enumerate(sorted([
-                (v.age, k) for k, v in CACHE.items()
-            ])):
-                if idx > 10: del CACHE[k]
-            
-            CACHE[request.query_string] = page
-            thread = Thread(target=worker.trend, args=(page.queue, conn, path, request.query))
-            thread.start()
-            return page.value
-        finally:
-            LOCK.release()
-
+            return page.value if page.loaded else page.update()
+        del CACHE[request.query_string]
+
+    page = CachedLoadingPage(
+        template("loading", progress=[]),
+        lambda queue: Thread(target=worker.trend, args=(
+            queue, conn, path, request.query
+        )).start()
+    )
+    clear_stale(CACHE)
+    enforce_limit(CACHE, 10)
+    CACHE[request.query_string] = page
+    return page.value
 
 @route('/grocery/groups')
 def groups():

+ 4 - 2
app/rest/trend.py

@@ -11,6 +11,7 @@ from bottle import (
 )
 from io import StringIO
 import matplotlib.pyplot as plt
+import matplotlib
 import seaborn as sns
 from psycopg import Connection
 from psycopg.connection import TupleRow
@@ -31,6 +32,8 @@ from .form import(
 )
 from . import PARAMS
 
+matplotlib.use('agg')
+
 def abort(code, text):
     return HTTPError(code, text)
 
@@ -66,8 +69,7 @@ def trend_internal(conn: Connection[TupleRow], path: str, query: DictProperty):
             
             pivot = data.pivot_table(index=['ts_raw',], columns=['product',], values=['$/unit'], aggfunc='mean')
             pivot.columns = pivot.columns.droplevel()
-            
-            sns.set(style="darkgrid", palette='pastel', context="talk")
+            sns.set_theme(style='darkgrid', palette='pastel', context="talk")
             plt.style.use("dark_background")
             plt.rcParams.update({
     "lines.color": "#ffffff",

+ 1 - 1
test/rest/test_CachedLoadingPage.py

@@ -15,7 +15,7 @@ from app.rest.CachedLoadingPage import (
 
 @fixture
 def cache():
-    return CachedLoadingPage("start")
+    return CachedLoadingPage("start", lambda _: None)
 
 
 def test_get_age(cache: CachedLoadingPage):