db_utils.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #
  2. # Copyright (c) Daniel Sheffield 2021 - 2022
  3. #
  4. # All rights reserved
  5. #
  6. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  7. from sqlite3 import Cursor
  8. import time
  9. from typing import Any, Callable
  10. from .txn_view import (
  11. get_table_statement,
  12. get_transactions_statement,
  13. get_session_transactions_statement,
  14. )
  15. from .price_view import(
  16. get_historic_prices_statement,
  17. )
  18. from dateutil.parser import parse as parse_time
  19. import pandas as pd
  20. NON_IDENTIFIER_COLUMNS = [
  21. 'ts',
  22. 'store',
  23. 'quantity',
  24. 'unit',
  25. 'price',
  26. 'organic',
  27. ]
  28. display_map = {
  29. 'ts': lambda x: f"{time.strftime('%Y-%m-%d %H:%M', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}",
  30. '%d/%m/%y %_I%P': lambda x: f"{time.strftime('%d/%m/%y %_I%P', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}",
  31. 'price': lambda x: f'{x:.4f}',
  32. 'quantity': lambda x: f'{x:.2f}',
  33. 'organic': lambda x: 'yes' if x else 'no',
  34. }
  35. display_mapper: Callable[
  36. [Any, str], str
  37. ] = lambda data, name: display_map[name](data) if name in display_map else data
  38. def cursor_as_dict(cur):
  39. _col_idx_map=dict(map(lambda col: (col[1].name, col[0]), enumerate(cur.description)))
  40. for row in map(lambda row, _map=_col_idx_map: dict([
  41. (name, row[i]) for name, i in _map.items()
  42. ]), cur.fetchall()):
  43. #print(row)
  44. yield row
  45. def get_data(cursor, statement, display):
  46. cursor.execute(statement)
  47. yield from map(lambda x: dict([
  48. (k, display(v, k)) for k,v in x.items()
  49. ]), cursor_as_dict(cursor))
  50. def get_session_transactions(cursor, statement, display):
  51. #print(cur.mogrify(statement).decode("utf-8"))
  52. #input()
  53. df = pd.DataFrame(get_data(cursor, statement, display))
  54. if df.empty:
  55. return ''
  56. return df.drop(labels=[
  57. 'id', 'ts', 'store', 'code',
  58. ], axis=1).to_string(header=[
  59. 'Description', 'Volume', 'Unit', 'Price', '$/unit', 'Total',
  60. 'Group', 'Category', 'Product', 'Organic',
  61. ], justify='justify-all', max_colwidth=60, index=False)
  62. def record_matches(record, strict=None, **kwargs):
  63. strict = [ x.lower() for x in (strict or []) ]
  64. for key, query, candidate in (
  65. (k.lower(), v.lower(), record[k].lower()) for k, v in kwargs.items()
  66. ):
  67. if not query:
  68. continue
  69. if key in strict and query != candidate:
  70. return False
  71. for term in query.split():
  72. if term not in candidate:
  73. return False
  74. return True
  75. def unique_suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
  76. exclude = filter(
  77. lambda x: x != name or name == 'ts',
  78. exclude,
  79. )
  80. [ kwargs.pop(k) for k in exclude if k in kwargs]
  81. items = suggestions(cur, statement, name, display, exclude=exclude, **kwargs)
  82. ret = sorted(set(map(lambda x: x[name], items)))
  83. tables = {
  84. 'product',
  85. 'category',
  86. 'group',
  87. 'unit',
  88. 'store',
  89. }
  90. if len(ret) > 0 or name not in tables:
  91. return ret
  92. items = (i for i in filter(lambda x: record_matches(x, **{ name: kwargs[name] }),
  93. get_data(cur, get_table_statement(name), display)))
  94. return sorted(set(map(lambda x: x[name], items)))
  95. def suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
  96. exclude = filter(
  97. lambda x: x != name or name == 'ts',
  98. exclude,
  99. )
  100. [ kwargs.pop(k) for k in exclude if k in kwargs]
  101. yield from filter(lambda x: record_matches(
  102. x, strict=[ k for k in kwargs if k != name ], **kwargs
  103. ), get_data(cur, statement, display))
  104. def get_insert_product_statement(product, category, group):
  105. return f'CALL insert_product($prod${product}$prod$, $category${category}$category$, $group${group}$group$)'
  106. class QueryManager(object):
  107. def __init__(self, cursor: Cursor, display: Callable[
  108. [Any, str], str
  109. ]):
  110. self.display = display
  111. self.cursor = cursor
  112. def get_historic_prices(self, rating_cb, sort, product, unit, organic=None, limit=None):
  113. statement = get_historic_prices_statement(sort, product, unit, organic=organic, limit=limit)
  114. #print(self.cursor.mogrify(statement).decode('utf-8'))
  115. #input()
  116. df = pd.DataFrame(get_data(self.cursor, statement, self.display))
  117. if df.empty:
  118. rating_cb(None, None, None)
  119. return ''
  120. _avg, _min, _max = [ x for x in df[['avg','min','max']].iloc[0].apply(float) ]
  121. rating_cb(_avg, _min, _max)
  122. return df.drop(labels=[
  123. 'id', 'avg', 'min', 'max'
  124. ], axis=1).to_string(header=[
  125. 'Date', 'Store', '$/unit', 'Org',
  126. ], justify='justify-all', max_colwidth=16, index=False)
  127. def get_session_transactions(self, date, store):
  128. statement = get_session_transactions_statement(
  129. parse_time(date), store, full_name=True, exact_time=True
  130. )
  131. return get_session_transactions(self.cursor, statement, self.display)
  132. def unique_suggestions(self, name, **kwargs):
  133. statement = get_transactions_statement()
  134. return unique_suggestions(self.cursor, statement, name, self.display, **kwargs)
  135. def insert_new_product(self, product, category, group):
  136. self.cursor.execute(get_insert_product_statement(product, category, group))