# # Copyright (c) Daniel Sheffield 2021 - 2023 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY import itertools from decimal import Decimal, InvalidOperation from itertools import chain from typing import Callable, Union from urwid import ( connect_signal, AttrMap, Button, Columns, Divider, Filler, LineBox, Padding, Pile, RadioButton, Text, ) from .. import COPYRIGHT from ..widgets import ( AutoCompleteEdit, AutoCompleteFloatEdit, FocusWidget, AutoCompletePopUp, NoTabCheckBox, FlowBarGraphWithVScale, ) from ..data.QueryManager import QueryManager from . import ActivityManager, show_or_exit from .Rating import Rating class PriceCheck(FocusWidget): def keypress(self, size, key): if isinstance(key, tuple): return if getattr(self._w.original_widget, 'original_widget', None) is None: return super().keypress(size, key) if key == 'tab': self.advance_focus() elif key == 'shift tab': self.advance_focus(reverse=True) else: return super().keypress(size, key) def apply_choice(self, name, value): self.apply_changes(name, value) for k,v in self.data.items(): if k == name or v: continue options = self.query_manager.unique_suggestions(k, **self.data) if len(options) == 1: self.apply_changes(k, list(options)[0]) def apply_changes(self, name, value): self.data = { name: value if name != 'organic' else { 'yes': True, 'no': False, True: True, False: False, 'mixed': '', }[value], } @property def data(self): ret = dict(itertools.chain( [(k, v.get_edit_text()) for k,v in self.edit_fields.items()], [(k, v.state) for k,v in self.checkboxes.items()] )) return ret @data.setter def data(self, _data: dict): for k,v in _data.items(): if k in self.edit_fields and v != self.edit_fields[k].get_edit_text(): self.edit_fields[k].set_edit_text(v) if k in self.checkboxes and v != self.checkboxes[k].state: self.checkboxes[k].set_state(v) def clear(self): for (_, ef) in self.edit_fields.items(): ef.set_edit_text('') for (_, cb) in self.checkboxes.items(): cb.set_state('mixed') for (_, tf) in self.text_fields.items(): tf.set_text('') self.graph.set_data([],0) return self.update() def update(self): self.update_historic_prices(self.data) return self def update_graph(self, df): # after truncating, need to recalculate avg(median), min, max df = df.sort_values( 'ts_raw', ascending=True, ignore_index=True ).truncate( before=max(0, len(df.index)-self.graph._canvas_width) ) data = df[['$/unit','quantity']].apply( lambda x: (float(x['$/unit']), float(x['quantity'])), axis=1, result_type='broadcast' ) data['avg'] = (data['$/unit']*data['quantity']).sum()/data['quantity'].sum() data_max = data.max()['$/unit'] #.max() assert len(data['avg'].unique()) == 1 norm = [ (x,) for x in data['$/unit'] ] #.to_records(index=False) self.graph.set_data(norm, data_max, vscale=[x for x in map(float, [ data['$/unit'].min(), data['$/unit'].median(), data['avg'].iloc[0], data_max ])] ) #self.graph.set_bar_width(1) # canvas_width = 10 + pad + pad + 10 date_strlen = (self.graph.canvas_width - 20) ex = "─" if date_strlen % 2 else "" plen = date_strlen//2 caption = f"{df['ts_raw'].min():%d/%m/%Y}" caption += f"{{p:>{plen}}}{ex}{{p:<{plen}}}".format( p="─") caption += f"{df['ts_raw'].max():%d/%m/%Y}" self.graph.set_caption(caption) def update_historic_prices(self, data): organic = None if data['organic'] == 'mixed' else data['organic'] sort = '$/unit' if self.buttons['sort_price'].state else 'ts' product, unit = data['product'] or None, data['unit'] or None try: price = Decimal(data['price']) except InvalidOperation: price = None try: quantity = Decimal(data['quantity']) except InvalidOperation: quantity = None if None in (sort, product, unit): self.text_fields['dbview'].set_text('') self.rating.update_rating(None, None, None, unit) return df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic).dropna() if df.empty: self.text_fields['dbview'].set_text('') self.rating.update_rating(None, None, None, unit) return assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}" _avg, _min, _max = [ float(x) for x in df[['avg','min','max']].iloc[0] ] self.rating.update_rating(_avg, _min, _max, unit, price=price, quantity=quantity) self.text_fields['dbview'].set_text( self.rating.get_historic_prices(df) ) self.update_graph(df) def __init__(self, activity_manager: ActivityManager, query_manager: QueryManager, ): button_group = [] self.buttons = { 'clear': Button(('streak', 'Clear')), 'exit': Button(('streak', 'Exit')), 'sort_price': RadioButton(button_group, ('streak', 'Best'), state="first True"), 'sort_date': RadioButton(button_group, ('streak', 'Last'), state="first True"), } self.edit_fields = { 'product': AutoCompleteEdit(('bg', 'product')), 'unit': AutoCompleteEdit(('bg', 'unit')), 'quantity': AutoCompleteFloatEdit(('bg', 'quantity')), 'price': AutoCompleteFloatEdit(('bg', 'price')), } self.checkboxes = { 'organic': NoTabCheckBox(('bg', "Organic"), state='mixed'), } self.text_fields = dict(( (k, Text('')) for k in ('dbview', 'spread', 'rating', 'marker') )) self.rating = Rating(dict(filter( lambda x: x[0] in ('spread','rating','marker'), self.text_fields.items() ))) top_pane = [ 'clear', 'exit', ['sort_price', 'sort_date'], ] left_pane = [ 'product', 'organic', ] badge = [ 'rating', 'spread', 'marker', ] right_pane = [ 'unit', 'quantity', 'price', ] bottom_pane = [ 'graph', 'dbview', ] self.query_manager = query_manager self.organic_checkbox = self.checkboxes['organic'] connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update()) self.autocomplete_callback = lambda widget, options: len(options) and widget._emit('open', options) for (k, ef) in self.edit_fields.items(): connect_signal(ef, 'postchange', lambda _,v: self.update()) connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback( w, query_manager.unique_suggestions(name, **self.data) )) for b in button_group: connect_signal(b, 'postchange',lambda *_: self.update()) connect_signal(self.buttons['clear'], 'click', lambda x: self.clear().update()) connect_signal(self.buttons['exit'], 'click', lambda x: show_or_exit('esc')) self.graph = FlowBarGraphWithVScale( 50, 14, ['bg','popup_focus', 'badge_neutral' ], hatt=['dark red', 'dark red', 'dark red'] ) self.clear() header = Text(u'Price Check', 'center') _copyright = Text(COPYRIGHT, 'center') banner = Pile([ Padding(header, 'center', width=('relative', 100)), Padding(_copyright, 'center', width=('relative', 100)), ]) banner = AttrMap(banner, 'banner') _widgets = dict(itertools.chain(*[ [(k, v) for k,v in x] for x in map(lambda x: x.items(), [ self.edit_fields, self.text_fields, self.checkboxes ]) ])) _widgets.update([ (k, LineBox(AttrMap( AutoCompletePopUp( self.edit_fields[k], self.apply_choice, lambda: activity_manager.show(self.update()) ), 'streak'), title=k.title(), title_align='left') ) for k in self.edit_fields ]) _widgets.update({ 'dbview': LineBox( AttrMap(self.text_fields['dbview'], 'streak'), title="Historic Prices", title_align='center', ), }) components = { 'top_pane': Columns([ (9, Pile([ Divider(), AttrMap(self.buttons['clear'], 'streak'), Divider(), ])), LineBox( Columns([ v for k,v in self.buttons.items() if 'sort' in k]), title="Sort price by", title_align='left', ), (9, Pile([ Divider(), AttrMap(self.buttons['exit'], 'streak'), Divider(), ])) ], dividechars=1), 'right_pane': (16, Pile(map( lambda x: _widgets[x] if x is not None else Divider, right_pane ))), 'left_pane': Pile(map( lambda x: _widgets[x] if x is not None else Divider, left_pane )), 'badge': Pile(map( lambda x: _widgets[x] if x is not None else Divider, badge )), } _widgets.update({ 'graph': LineBox( self.graph, title="Historic Price", title_align='left' ), }) components.update({ 'bottom_pane': [ _widgets['graph'], _widgets['dbview'] ], }) components.update({ 'left_pane': Pile([ components['left_pane'], LineBox( AttrMap(components['badge'], 'badge'), title="Current Price", title_align='left', ) ])}) widget = Pile([ banner, Divider(), components['top_pane'], Columns((components['left_pane'], components['right_pane']), dividechars=0, ), Pile(components['bottom_pane']), ]) widget = Filler(widget, 'top') widget = AttrMap(widget, 'bg') super().__init__(widget, map( lambda x: next(w[1] for w in chain( self.buttons.items(), self.edit_fields.items(), self.checkboxes.items(), ) if x == w[0]), [ 'product', 'organic', 'unit', 'quantity', 'price', 'clear', 'exit', 'sort_price', 'sort_date', ] ))