# # Copyright (c) Daniel Sheffield 2023 # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from typing import Callable, Iterable, Tuple from urllib.parse import urlencode from bottle import request, FormsDict, redirect, HTTPError from psycopg import Connection from psycopg.connection import TupleRow from .QueryCache import QueryCache, get_hash from ..data.filter import get_filter, get_query_param from . import BOOLEAN, PARAMS from .PageCache import PageCache from .hash_util import base32_to_hash, hash_to_base32, normalize_base32 def normalize_query(query: FormsDict, allow: Iterable[str] = None) -> Tuple[str, str]: if query.hash: _hash = normalize_base32(query.hash) else: _hash = None allow = allow or PARAMS param = get_filter(query, allow=allow) norm = urlencode([ ( k, get_query_param(*param[k]) ) if k != 'organic' else ( "organic", BOOLEAN[BOOLEAN.get(query.organic, None)] ) for k in sorted(param) if param[k] ]) return norm if not _hash or len(query.keys()) > 1 else None, _hash def _cache_decorator(func: Callable, query_cache: QueryCache = None, page_cache: PageCache = None): def wrap(*args, **kwargs): _, _, path, *_ = request.urlparts endpoint = path.split('/', 2)[-1] query, _hash = normalize_query(request.params) query = f"{endpoint}?{query}" if not _hash: _hashInt = get_hash(query) _hash = hash_to_base32(_hashInt) key = (query, _hashInt) else: key = (query, base32_to_hash(_hash)) if request.params.reload == "true": page_cache.remove(key) # key with tuple to avoid ambiguity cached = query_cache.get(key) if not cached: if query: cached = query_cache.add(key, None) else: return HTTPError(404, f"No query found for hash: {_hash}") if not request.params.hash: if cached and len(cached) > 2000: return redirect(f"{path}?hash={_hash}") if cached and f"{endpoint}?{request.query_string}" != cached: return redirect(cached) return func((cached, key[1]), page_cache, *args, **kwargs) return wrap def cache(*args, **kwargs): if not len(args): return lambda f: _cache_decorator(f, **kwargs) raise Exception("decorator argument required") def _cursor_decorator(func: Callable, connection: Connection[TupleRow] = None): def wrap(*args, **kwargs): try: with connection.cursor() as cur: return func(cur, *args, **kwargs) finally: connection.commit() return wrap def cursor(*args, **kwargs): if not len(args): return lambda f: _cursor_decorator(f, **kwargs) raise Exception("decorator argument required")