route_decorators.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. #
  2. # Copyright (c) Daniel Sheffield 2023
  3. # All rights reserved
  4. #
  5. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  6. from bottle import request, FormsDict, redirect
  7. from typing import Callable, Iterable
  8. from urllib.parse import urlencode
  9. from psycopg import Connection
  10. from psycopg.connection import TupleRow
  11. from ..data.filter import get_filter, get_query_param
  12. from . import PARAMS
  13. from .Cache import Cache
  14. def normalize_query(query: FormsDict, allow: Iterable[str] = None) -> str:
  15. param = get_filter(query, allow=allow)
  16. return urlencode([
  17. (k, get_query_param(*param[k])) for k in sorted(param) if param[k]
  18. ])
  19. def _normalize_decorator(func: Callable, poison_on_reload: bool = False):
  20. def wrap(*args, **kwargs):
  21. _, _, path, *_ = request.urlparts
  22. normalized = normalize_query(request.query, allow=PARAMS)
  23. if request.query_string != normalized:
  24. return redirect(f'{path}?{normalized}')
  25. return func(*args, **kwargs)
  26. return wrap
  27. def normalize(*args, **kwargs):
  28. if not len(args):
  29. return lambda f: _normalize_decorator(f, **kwargs)
  30. return _normalize_decorator(*args)
  31. def _poison_decorator(func: Callable, cache: Cache = None):
  32. def wrap(*args, **kwargs):
  33. normalized = normalize_query(request.query, allow=PARAMS)
  34. if request.params.get('reload') == 'true':
  35. cache.remove(normalized)
  36. return func(*args, **kwargs)
  37. return wrap
  38. def poison(*args, **kwargs):
  39. if not len(args):
  40. return lambda f: _poison_decorator(f, **kwargs)
  41. raise Exception("decorator argument required")
  42. def _cursor_decorator(func: Callable, connection: Connection[TupleRow] = None):
  43. def wrap(*args, **kwargs):
  44. try:
  45. with connection.cursor() as cur:
  46. return func(cur, *args, **kwargs)
  47. finally:
  48. connection.commit()
  49. return wrap
  50. def cursor(*args, **kwargs):
  51. if not len(args):
  52. return lambda f: _cursor_decorator(f, **kwargs)
  53. raise Exception("decorator argument required")