QueryManager.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #
  2. # Copyright (c) Daniel Sheffield 2021 - 2023
  3. #
  4. # All rights reserved
  5. #
  6. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  7. import time
  8. from typing import Any, Callable
  9. from psycopg import Cursor
  10. from psycopg.sql import (
  11. SQL,
  12. Literal,
  13. )
  14. from dateutil.parser import parse as parse_time
  15. import pandas as pd
  16. from .TransactionView import (
  17. get_table_statement,
  18. get_transactions_statement,
  19. get_session_transactions_statement,
  20. NON_IDENTIFIER_COLUMNS,
  21. )
  22. from .PriceView import(
  23. get_historic_prices_statement,
  24. )
  25. display_map = {
  26. 'ts': lambda x: f"{time.strftime('%Y-%m-%d %H:%M', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}",
  27. '%d/%m/%y %_I%P': lambda x: f"{time.strftime('%d/%m/%y %_I%P', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}",
  28. 'price': lambda x: f'{x:.4f}',
  29. 'quantity': lambda x: f'{x:.2f}' if x is not None else None,
  30. 'organic': lambda x: 'yes' if x else 'no',
  31. 'tags': lambda x: '' if not x else x,
  32. }
  33. display_mapper: Callable[
  34. [Any, str], str
  35. ] = lambda data, name: display_map[name](data) if name in display_map else data
  36. def cursor_as_dict(cur):
  37. _col_idx_map=dict(map(lambda col: (col[1].name, col[0]), enumerate(cur.description)))
  38. for row in map(lambda row, _map=_col_idx_map: {
  39. name: row[i] for name, i in _map.items()
  40. }, cur.fetchall()):
  41. #print(row)
  42. yield row
  43. def get_data(cursor, statement, display=None):
  44. cursor.execute(statement)
  45. if display is not None:
  46. yield from map(lambda x: dict(
  47. map(lambda k: (k, display(x[k], k)), x)
  48. ), cursor_as_dict(cursor))
  49. else:
  50. yield from cursor_as_dict(cursor)
  51. def get_session_transactions(cursor, statement, display):
  52. #print(cur.mogrify(statement).decode("utf-8"))
  53. #input()
  54. df = pd.DataFrame(get_data(cursor, statement, display))
  55. if df.empty:
  56. return ''
  57. return df.drop(labels=[
  58. 'id', 'ts', 'store', 'code', 'quantity',
  59. ], axis=1).to_string(header=[
  60. 'Description', 'Volume', 'Unit', 'Price', '$/unit', 'Total',
  61. 'Group', 'Category', 'Product', 'Organic', 'Tags'
  62. ], justify='justify-all', max_colwidth=60, index=False)
  63. def record_matches(record, strict=None, **kwargs):
  64. strict = [ x.lower() for x in (strict or []) ]
  65. for key, query, candidate in (
  66. (k.lower(), f"{v}".lower(), f"{record[k]}".lower()) for k, v in kwargs.items()
  67. ):
  68. if not query:
  69. continue
  70. if key in strict and query != candidate:
  71. return False
  72. for term in query.split():
  73. if term not in candidate:
  74. return False
  75. return True
  76. def unique_suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
  77. exclude = filter(
  78. lambda x: x != name or name == 'ts',
  79. exclude,
  80. )
  81. [ kwargs.pop(k) for k in exclude if k in kwargs]
  82. items = suggestions(cur, statement, name, display, exclude=exclude, **kwargs)
  83. ret = sorted(set(map(lambda x: x[name], items)))
  84. tables = {
  85. 'product',
  86. 'category',
  87. 'group',
  88. 'unit',
  89. 'store',
  90. 'tags',
  91. }
  92. if len(ret) > 0 or name not in tables:
  93. return ret
  94. items = (i for i in filter(lambda x: record_matches(x, **{ name: kwargs[name] }) if name in kwargs else True,
  95. get_data(cur, get_table_statement(name), display)))
  96. ret = sorted(set(map(lambda x: x[name], items)))
  97. return ret
  98. def suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
  99. exclude = filter(
  100. lambda x: x != name or name == 'ts',
  101. exclude,
  102. )
  103. [ kwargs.pop(k) for k in exclude if k in kwargs]
  104. yield from filter(lambda x: record_matches(
  105. x, strict=[ k for k in kwargs if k != name ], **kwargs
  106. ), get_data(cur, statement, display))
  107. def get_insert_product_statement(product, category, group, unit):
  108. unit_sql = f', $unit${unit}$unit$' if unit else ''
  109. return f'CALL insert_product($prod${product}$prod$, $category${category}$category$, $group${group}$group${unit_sql})'
  110. class QueryManager(object):
  111. def __init__(self, cursor: Cursor, display: Callable[
  112. [Any, str], str
  113. ]):
  114. self.display = display
  115. self.cursor = cursor
  116. def get_historic_prices_data(self, unit, sort=None, product=None, category=None, group=None, tag=None, organic=None, limit=None):
  117. statement = get_historic_prices_statement(unit, sort=sort, product=product, category=category, group=group, tag=tag, organic=organic, limit=limit)
  118. data = get_data(self.cursor, statement)
  119. return pd.DataFrame(map(lambda x: dict(
  120. map(lambda k: (k, self.display(x[k], k)), x)
  121. ), data))
  122. def get_session_transactions(self, date, store):
  123. statement = get_session_transactions_statement(
  124. parse_time(date), store, full_name=True, exact_time=True
  125. )
  126. return get_session_transactions(self.cursor, statement, self.display)
  127. def unique_suggestions(self, name, **kwargs):
  128. statement = get_transactions_statement(name, **kwargs)
  129. return unique_suggestions(self.cursor, statement, name, self.display, **kwargs)
  130. def insert_new_product(self, product, category, group, unit):
  131. self.cursor.execute(get_insert_product_statement(product, category, group, unit))
  132. def get_preferred_unit(self, product):
  133. self.cursor.execute(SQL("""SELECT
  134. units.name AS preferred_unit
  135. FROM products
  136. JOIN units ON unit_id = units.id
  137. WHERE products.name = {product}""").format(product=Literal(product)))
  138. res = list(cursor_as_dict(self.cursor))
  139. if len(res) == 0:
  140. return None
  141. assert len(res) == 1
  142. return res[0]['preferred_unit']