123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- from app.data.QueryManager import QueryManager, display_mapper
- from datetime import date, datetime
- import seaborn as sns
- import pandas as pd
- import matplotlib.pyplot as plt
- import os
- import sys
- from sqlite3 import Cursor
- import psycopg
- from db_credentials import HOST, PASSWORD
- ALL_UNITS = {'g','kg','mL','L','Pieces','Bunches','Bags'}
- host = f'host={HOST}'
- password = f'password={PASSWORD}'
- user = os.getenv('USER')
- conn = psycopg.connect(f"{host} dbname=grocery user={user} {password}")
- cur: Cursor = conn.cursor()
- cur.execute("BEGIN")
- query_manager = QueryManager(cur, display_mapper)
- fields = dict()
- for k in ('group', 'category', 'product', 'unit'):
- options = query_manager.unique_suggestions(k, **fields)
- names = [ o for o in options ]
- matches = '\t'.join(names)
- print(f'{k.title()} names: {matches}')
- while (len(options) >=1):
- v = fields[k] if k in fields else ''
- fields[k] = v + input(f"{k.title()}: {v}")
- options = query_manager.unique_suggestions(k, **fields)
- if k == 'unit':
- options = sorted(set(options) | ALL_UNITS)
- if fields[k] == '':
- choice = ''
- break
- elif len(options) == 1:
- choice = options[0]
- break
- elif fields[k].lower() in map(lambda x: x.lower(), options):
- choice = next(
- filter(lambda x: x.lower() == fields[k].lower(), options)
- )
- break
- elif len(options) == 0 and k == 'unit':
- choice = fields[k]
- break
- matches = '\t'.join(options)
- print(f'Matches ({k}): {matches}')
- if k != 'unit':
- fields[k] = (fields[k] and choice) or ''
- else:
- fields[k] = choice
- if fields[k] == '':
- print(f'Ignoring {k} {choice} as it does not exist')
- else:
- print(f'Selected {k}: {fields[k]}')
- print()
- fields = dict((k,v or None) for k,v in fields.items())
- unit = fields['unit'] or 'kg'
- if unit not in ALL_UNITS:
- print(f'Invalid unit: {unit}')
- exit(2)
- print(f'Getting data for selection:\n ')
- print(f'\n '.join([f'{k.title()}: {v}' for k,v in fields.items()]))
- d = pd.DataFrame(query_manager.get_historic_prices_data(unit, **dict((k,v) for k,v in fields.items() if k != 'unit')))
- if d.empty:
- sys.exit(1)
- print(d.info())
- print(d)
- sns.set_theme()
- now = datetime.now().date()
- d['ts_month'] = d['ts_raw'].apply(lambda x: date(x.date().year, x.date().month,1))
- p = d[d['ts_month'] == date(now.year,now.month,1) ].groupby(['ts_month','group',])['price', 'quantity'].sum()
- if not p.empty:
- p = p.reset_index().set_index('group')
- p['price'] = p['price'].apply(float)
- p['quantity'] = p['quantity'].apply(float)
- print(p.info())
- print(p)
- ax = p.plot.pie(y='quantity', figsize=(5, 5))
- ax.get_legend().remove()
- ax.set_xlabel('')
- ax.set_ylabel('')
- ax.set_title(f'Quantity ({unit})')
- plt.show()
- ax = p.plot.pie(y='price', figsize=(5, 5))
- ax.get_legend().remove()
- ax.set_xlabel('')
- ax.set_ylabel('')
- ax.set_title(f'Price ($)')
- plt.show()
- p = d.pivot_table(index=['ts_raw',], columns=['product',], values=['$/unit'], aggfunc='mean')
- p.columns = p.columns.droplevel()
- print(p.info())
- print(p)
- ax = sns.lineplot(data=p, markers=True)
- ax.set_xlabel('Time')
- ax.set_ylabel(f'$ / {unit}')
- plt.show()
|