# # Copyright (c) Daniel Sheffield 2023 # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY import os from typing import Dict, Iterable, Tuple from urllib.parse import urlencode from bottle import FormsDict from ..data.filter import get_filter, get_query_param from . import BOOLEAN, PARAMS from .hash_util import base32_to_hash, blake, bytes_to_hash, hash_to_base32, normalize_base32 def delete_query(name: str, root: str = 'app/rest/static/files'): directory = f'{root}/{name}' try: os.remove(f'{directory}/{name}.query') except FileNotFoundError: pass def save_query(name: str, content: bytes, tool: str, root='app/rest/static/files') -> str: directory = f'{root}/{name}' try: os.mkdir(directory, mode=0o700, dir_fd=None) except FileExistsError: pass fd = os.open(f'{directory}/{name}.query', os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 0o600) with open(fd, "wb") as f: f.write(content) def get_query(name: str, root: str = 'app/rest/static/files') -> str: directory = f'{root}/{name}' try: mtime = os.stat(f'{directory}/{name}.query').st_mtime except: mtime = None # if mtime and time() - mtime > STALE: # delete_query(name) # return None # todo: store query hash and validate it fd = os.open(f'{directory}/{name}.query', os.O_RDONLY, 0o600) with open(fd, "rb") as f: f.seek(0) page = f.read() return page.decode('utf-8') def norm(key): if isinstance(key, tuple): query, _hash = key else: if isinstance(key, int): query, _hash = None, key else: query, _hash = key, None if _hash and not isinstance(_hash, int): _hash = base32_to_hash(normalize_base32(query.hash)) # TODO: normalize should be implicit #_hash = base32_to_hash(query.hash) if None not in (query, _hash): if get_hash(query) != _hash: raise KeyError(f"Invalid key: {key}") return query, _hash if (_hash, query) is (None, None): raise KeyError(f"Invalid key: {key}") return query, _hash if _hash else get_hash(query) def get_hash(key): _bytes = blake(key.encode('utf-8'), person='grocery'.encode('utf-8')) return bytes_to_hash(_bytes) def normalize_query(query: FormsDict, allow: Iterable[str] = None) -> Tuple[str, str]: _hash = query.hash allow = allow or PARAMS param = get_filter(query, allow=allow) norm = urlencode(map( lambda k: ( k, get_query_param(*param[k]) if k != 'organic' else BOOLEAN[ BOOLEAN.get(query.organic, None) ] ), sorted(filter(bool, param)) )) return norm, _hash class QueryCache: def __init__(self, limit) -> None: self._cache: Dict[int, str] = dict() self._limit = limit def __delitem__(self, key): return self.remove(key) def __getitem__(self, key): return self.get(key) def __setitem__(self, key, value): return self.add(key, value) def get(self, key: str) -> str: query, _hash = norm(key) if _hash not in self._cache: if query: return self.add(_hash, query) try: existing = get_query(hash_to_base32(_hash)) except: existing = None if existing: return self.add(_hash, existing) return self.add(_hash, query) value = self._cache[_hash] # if value.stale: # del self._cache[key] # delete_query(hash_to_base32(key)) # return None return value # def _enforce_limit(self, limit): # for idx, (_, k) in enumerate(sorted([ # (v.age, k) for k, v in self._cache.items() # ])): # if idx >= limit: del self[k] # def _clear_stale(self): # for k in [k for k, v in self._cache.items() if v.stale]: # del self[k] def add(self, key: str, value: str) -> str: #self._clear_stale() #self._enforce_limit(self._limit) query, _hash = norm(key) value = value or query if not value: raise ValueError("Invalid query string: {value}") self._cache[_hash] = value save_query(hash_to_base32(_hash), value.encode("utf-8"), 'query') return value def remove(self, key: str): key = norm(key) self._cache.pop(key, None) delete_query(hash_to_base32(key))