# # Copyright (c) Daniel Sheffield 2021 - 2022 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from sqlite3 import Cursor import time from typing import Any, Callable from .txn_view import ( get_table_statement, get_transactions_statement, get_session_transactions_statement, ) from .price_view import( get_historic_prices_statement, ) from dateutil.parser import parse as parse_time import pandas as pd NON_IDENTIFIER_COLUMNS = [ 'ts', 'store', 'quantity', 'unit', 'price', 'organic', ] display_map = { 'ts': lambda x: f"{time.strftime('%Y-%m-%d %H:%M', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}", 'price': lambda x: f'{x:.4f}', 'quantity': lambda x: f'{x:.2f}', 'organic': lambda x: 'yes' if x else 'no', } display_mapper: Callable[ [Any, str], str ] = lambda data, name: display_map[name](data) if name in display_map else data def cursor_as_dict(cur): _col_idx_map=dict(map(lambda col: (col[1].name, col[0]), enumerate(cur.description))) for row in map(lambda row, _map=_col_idx_map: dict([ (name, row[i]) for name, i in _map.items() ]), cur.fetchall()): #print(row) yield row def get_data(cursor, statement, display): cursor.execute(statement) yield from map(lambda x: dict([ (k, display(v, k)) for k,v in x.items() ]), cursor_as_dict(cursor)) def get_session_transactions(cursor, statement, date, store): #print(cur.mogrify(statement).decode("utf-8")) #input() cursor.execute(statement) df = pd.DataFrame(cursor_as_dict(cursor)) if df.empty: return '' return df.drop(labels=[ 'id', 'ts', 'store', 'code', ], axis=1).to_string(header=[ 'Description', 'Volume', 'Unit', 'Price', '$/unit', 'Total', 'Group', 'Category', 'Product', 'Organic', ], justify='justify-all', max_colwidth=60, index=False) def record_matches(record, strict=None, **kwargs): strict = strict or [] for k,v in kwargs.items(): if not v: continue if k in strict and v.lower() != record[k].lower(): return False if v.lower() not in record[k].lower(): return False return True def unique_suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs): exclude = filter( lambda x: x != name or name == 'ts', exclude, ) [ kwargs.pop(k) for k in exclude if k in kwargs] items = suggestions(cur, statement, name, display, exclude=exclude, **kwargs) ret = sorted(set(map(lambda x: x[name], items))) tables = { 'product', 'category', 'group', 'unit', 'store', } if len(ret) > 0 or name not in tables: return ret items = (i for i in filter(lambda x: record_matches(x, **{ name: kwargs[name] }), get_data(cur, get_table_statement(name), display))) return sorted(set(map(lambda x: x[name], items))) def suggestions(cur, statement, name, display, exclude=NON_IDENTIFIER_COLUMNS, **kwargs): exclude = filter( lambda x: x != name or name == 'ts', exclude, ) [ kwargs.pop(k) for k in exclude if k in kwargs] yield from filter(lambda x: record_matches( x, strict=[ k for k in kwargs if k != name ], **kwargs ), get_data(cur, statement, display)) def get_insert_product_statement(product, category, group): return f'CALL insert_product($prod${product}$prod$, $category${category}$category$, $group${group}$group$)' class QueryManager(object): def __init__(self, cursor: Cursor, display: Callable[ [Any, str], str ]): self.display = display self.cursor = cursor def get_historic_prices(self, rating_cb, sort, product, unit, organic=None, limit='365 days'): statement = get_historic_prices_statement(sort, product, unit, organic=organic, limit=limit) #print(self.cursor.mogrify(statement).decode('utf-8')) #input() df = pd.DataFrame(get_data(self.cursor, statement, self.display)) if df.empty: rating_cb(None, None, None) return '' _avg, _min, _max = [ x for x in df[['avg','min','max']].iloc[0].apply(float) ] rating_cb(_avg, _min, _max) return df.drop(labels=[ 'id', 'avg', 'min', 'max' ], axis=1).to_string(header=[ 'Date', 'Store', '$/unit', 'Org', ], justify='justify-all', max_colwidth=16, index=False) def get_session_transactions(self, date, store): return get_session_transactions( self.cursor, get_session_transactions_statement( parse_time(date), store, full_name=True, exact_time=True ), date, store) def unique_suggestions(self, name, **kwargs): statement = get_transactions_statement() return unique_suggestions(self.cursor, statement, name, self.display, **kwargs) def insert_new_product(self, product, category, group): self.cursor.execute(get_insert_product_statement(product, category, group))