123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- from sqlite3 import Cursor
- import time
- from typing import Any, Callable
- from .txn_view import (
- get_table_statement,
- get_transactions_statement,
- get_session_transactions_statement,
- )
- from .price_view import(
- get_historic_prices_statement,
- )
- from dateutil.parser import parse as parse_time
- import pandas as pd
- NON_IDENTIFIER_COLUMNS = [
- 'ts',
- 'store',
- 'quantity',
- 'unit',
- 'price',
- 'organic',
- ]
- display_map = {
- 'ts': lambda x: f"{time.strftime('%Y-%m-%d %H:%M', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}",
- 'price': lambda x: f'{x:.4f}',
- 'quantity': lambda x: f'{x:.2f}',
- 'organic': lambda x: 'yes' if x else 'no',
- }
- display_mapper: Callable[
- [Any, str], str
- ] = lambda data, name: display_map[name](data) if name in display_map else data
- def cursor_as_dict(cur):
- _col_idx_map=dict(map(lambda col: (col[1].name, col[0]), enumerate(cur.description)))
- for row in map(lambda row, _map=_col_idx_map: dict([
- (name, row[i]) for name, i in _map.items()
- ]), cur.fetchall()):
-
- yield row
- def get_data(cursor, statement, display):
- cursor.execute(statement)
- yield from map(lambda x: dict([
- (k, display(v, k)) for k,v in x.items()
- ]), cursor_as_dict(cursor))
- def get_session_transactions(cursor, statement, date, store):
-
-
- cursor.execute(statement)
- df = pd.DataFrame(cursor_as_dict(cursor))
- if df.empty:
- return ''
- return df.drop(labels=[
- 'id', 'ts', 'store', 'code',
- ], axis=1).to_string(header=[
- 'Description', 'Volume', 'Unit', 'Price', '$/unit', 'Total',
- 'Group', 'Category', 'Product', 'Organic',
- ], justify='justify-all', max_colwidth=60, index=False)
- def record_matches(record, strict=None, **kwargs):
- strict = strict or []
- for k,v in kwargs.items():
- if not v:
- continue
-
- if k in strict and v.lower() != record[k].lower():
- return False
-
- if v.lower() not in record[k].lower():
- return False
-
- return True
- def unique_suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
- exclude = filter(
- lambda x: x != name or name == 'ts',
- exclude,
- )
- [ kwargs.pop(k) for k in exclude if k in kwargs]
- items = suggestions(cur, statement, name, display, exclude=exclude, **kwargs)
- ret = sorted(set(map(lambda x: x[name], items)))
- tables = {
- 'product',
- 'category',
- 'group',
- 'unit',
- 'store',
- }
- if len(ret) > 0 or name not in tables:
- return ret
-
- items = (i for i in filter(lambda x: record_matches(x, **{ name: kwargs[name] }),
- get_data(cur, get_table_statement(name), display)))
- return sorted(set(map(lambda x: x[name], items)))
- def suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
- exclude = filter(
- lambda x: x != name or name == 'ts',
- exclude,
- )
- [ kwargs.pop(k) for k in exclude if k in kwargs]
- yield from filter(lambda x: record_matches(
- x, strict=[ k for k in kwargs if k != name ], **kwargs
- ), get_data(cur, statement, display))
- def get_insert_product_statement(product, category, group):
- return f'CALL insert_product($prod${product}$prod$, $category${category}$category$, $group${group}$group$)'
- class QueryManager(object):
-
- def __init__(self, cursor: Cursor, display: Callable[
- [Any, str], str
- ]):
- self.display = display
- self.cursor = cursor
-
- def get_historic_prices(self, rating_cb, sort, product, unit, organic=None, limit='365 days'):
- statement = get_historic_prices_statement(sort, product, unit, organic=organic, limit=limit)
-
-
-
- df = pd.DataFrame(get_data(self.cursor, statement, self.display))
- if df.empty:
- rating_cb(None, None, None)
- return ''
-
- _avg, _min, _max = [ x for x in df[['avg','min','max']].iloc[0].apply(float) ]
- rating_cb(_avg, _min, _max)
-
- return df.drop(labels=[
- 'id', 'avg', 'min', 'max'
- ], axis=1).to_string(header=[
- 'Date', 'Store', '$/unit', 'Org',
- ], justify='justify-all', max_colwidth=16, index=False)
-
- def get_session_transactions(self, date, store):
- return get_session_transactions(
- self.cursor, get_session_transactions_statement(
- parse_time(date), store, full_name=True, exact_time=True
- ), date, store)
- def unique_suggestions(self, name, **kwargs):
- statement = get_transactions_statement()
- return unique_suggestions(self.cursor, statement, name, self.display, **kwargs)
- def insert_new_product(self, product, category, group):
- self.cursor.execute(get_insert_product_statement(product, category, group))
|