12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- from bottle import request, FormsDict, redirect
- from typing import Callable, Iterable
- from urllib.parse import urlencode
- from psycopg import Connection
- from psycopg.connection import TupleRow
- from ..data.filter import get_filter, get_query_param
- from . import PARAMS
- from .Cache import Cache
- def normalize_query(query: FormsDict, allow: Iterable[str] = None) -> str:
- param = get_filter(query, allow=allow)
- return urlencode([
- (k, get_query_param(*param[k])) for k in sorted(param) if param[k]
- ])
- def _normalize_decorator(func: Callable, poison_on_reload: bool = False):
- def wrap(*args, **kwargs):
- _, _, path, *_ = request.urlparts
- normalized = normalize_query(request.query, allow=PARAMS)
- if request.query_string != normalized:
- return redirect(f'{path}?{normalized}')
- return func(*args, **kwargs)
- return wrap
- def normalize(*args, **kwargs):
- if not len(args):
- return lambda f: _normalize_decorator(f, **kwargs)
-
- return _normalize_decorator(*args)
- def _poison_decorator(func: Callable, cache: Cache = None):
- def wrap(*args, **kwargs):
- normalized = normalize_query(request.query, allow=PARAMS)
- if request.params.get('reload') == 'true':
- cache.remove(normalized)
- return func(*args, **kwargs)
- return wrap
- def poison(*args, **kwargs):
- if not len(args):
- return lambda f: _poison_decorator(f, **kwargs)
- raise Exception("decorator argument required")
- def _cursor_decorator(func: Callable, connection: Connection[TupleRow] = None):
- def wrap(*args, **kwargs):
- try:
- with connection.cursor() as cur:
- return func(cur, *args, **kwargs)
- finally:
- connection.commit()
- return wrap
- def cursor(*args, **kwargs):
- if not len(args):
- return lambda f: _cursor_decorator(f, **kwargs)
- raise Exception("decorator argument required")
|