# # Copyright (c) Daniel Sheffield 2023 # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from bottle import request, FormsDict, redirect from typing import Callable, Iterable from urllib.parse import urlencode from psycopg import Connection from psycopg.connection import TupleRow from ..data.filter import get_filter, get_query_param from . import PARAMS from .Cache import Cache def normalize_query(query: FormsDict, allow: Iterable[str] = None) -> str: param = get_filter(query, allow=allow) return urlencode([ (k, get_query_param(*param[k])) for k in sorted(param) if param[k] ]) def _normalize_decorator(func: Callable, poison_on_reload: bool = False): def wrap(*args, **kwargs): _, _, path, *_ = request.urlparts normalized = normalize_query(request.query, allow=PARAMS) if request.query_string != normalized: return redirect(f'{path}?{normalized}') return func(*args, **kwargs) return wrap def normalize(*args, **kwargs): if not len(args): return lambda f: _normalize_decorator(f, **kwargs) return _normalize_decorator(*args) def _poison_decorator(func: Callable, cache: Cache = None): def wrap(*args, **kwargs): normalized = normalize_query(request.query, allow=PARAMS) if request.params.get('reload') == 'true': cache.remove(normalized) return func(*args, **kwargs) return wrap def poison(*args, **kwargs): if not len(args): return lambda f: _poison_decorator(f, **kwargs) raise Exception("decorator argument required") def _cursor_decorator(func: Callable, connection: Connection[TupleRow] = None): def wrap(*args, **kwargs): try: with connection.cursor() as cur: return func(cur, *args, **kwargs) finally: connection.commit() return wrap def cursor(*args, **kwargs): if not len(args): return lambda f: _cursor_decorator(f, **kwargs) raise Exception("decorator argument required")