# # Copyright (c) Daniel Sheffield 2023 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from app.data.QueryManager import QueryManager, display_mapper from datetime import date, datetime import seaborn as sns import pandas as pd import matplotlib.pyplot as plt import os import sys from sqlite3 import Cursor import psycopg from db_credentials import HOST, PASSWORD ALL_UNITS = {'g','kg','mL','L','Pieces','Bunches','Bags'} host = f'host={HOST}' password = f'password={PASSWORD}' user = os.getenv('USER') conn = psycopg.connect(f"{host} dbname=grocery user={user} {password}") cur: Cursor = conn.cursor() cur.execute("BEGIN") query_manager = QueryManager(cur, display_mapper) fields = dict() for k in ('group', 'category', 'product', 'unit'): options = query_manager.unique_suggestions(k, **fields) names = [ o for o in options ] matches = '\t'.join(names) print(f'{k.title()} names: {matches}') while (len(options) >=1): v = fields[k] if k in fields else '' fields[k] = v + input(f"{k.title()}: {v}") options = query_manager.unique_suggestions(k, **fields) if k == 'unit': options = sorted(set(options) | ALL_UNITS) if fields[k] == '': choice = '' break elif len(options) == 1: choice = options[0] break elif fields[k].lower() in map(lambda x: x.lower(), options): choice = next( filter(lambda x: x.lower() == fields[k].lower(), options) ) break elif len(options) == 0 and k == 'unit': choice = fields[k] break matches = '\t'.join(options) print(f'Matches ({k}): {matches}') if k != 'unit': fields[k] = (fields[k] and choice) or '' else: fields[k] = choice if fields[k] == '': print(f'Ignoring {k} {choice} as it does not exist') else: print(f'Selected {k}: {fields[k]}') print() fields = dict((k,v or None) for k,v in fields.items()) unit = fields['unit'] or 'kg' if unit not in ALL_UNITS: print(f'Invalid unit: {unit}') exit(2) print(f'Getting data for selection:\n ') print(f'\n '.join([f'{k.title()}: {v}' for k,v in fields.items()])) d = pd.DataFrame(query_manager.get_historic_prices_data(unit, **dict((k,v) for k,v in fields.items() if k != 'unit'))) if d.empty: sys.exit(1) print(d.info()) print(d) sns.set_theme() now = datetime.now().date() d['ts_month'] = d['ts_raw'].apply(lambda x: date(x.date().year, x.date().month,1)) p = d[d['ts_month'] == date(now.year,now.month,1) ].groupby(['ts_month','group',])['price', 'quantity'].sum() if not p.empty: p = p.reset_index().set_index('group') p['price'] = p['price'].apply(float) p['quantity'] = p['quantity'].apply(float) print(p.info()) print(p) ax = p.plot.pie(y='quantity', figsize=(5, 5)) ax.get_legend().remove() ax.set_xlabel('') ax.set_ylabel('') ax.set_title(f'Quantity ({unit})') plt.show() ax = p.plot.pie(y='price', figsize=(5, 5)) ax.get_legend().remove() ax.set_xlabel('') ax.set_ylabel('') ax.set_title(f'Price ($)') plt.show() p = d.pivot_table(index=['ts_raw',], columns=['product',], values=['$/unit'], aggfunc='mean') p.columns = p.columns.droplevel() print(p.info()) print(p) ax = sns.lineplot(data=p, markers=True) ax.set_xlabel('Time') ax.set_ylabel(f'$ / {unit}') plt.show()