# # Copyright (c) Daniel Sheffield 2021 - 2022 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY import itertools from decimal import Decimal, InvalidOperation from typing import Callable, Union from urwid import ( connect_signal, AttrMap, Button, Columns, Divider, Filler, LineBox, Padding, Pile, RadioButton, Text, ) from .. import COPYRIGHT from ..widgets import ( AutoCompleteEdit, AutoCompleteFloatEdit, FocusWidget, AutoCompletePopUp, NoTabCheckBox ) from ..db_utils import QueryManager from . import ActivityManager, show_or_exit from .Rating import Rating class PriceCheck(FocusWidget): def keypress(self, size, key): if isinstance(key, tuple): return if getattr(self.original_widget.original_widget, 'original_widget', None) is None: return super().keypress(size, key) if key == 'tab': self.advance_focus() elif key == 'shift tab': self.advance_focus(reverse=True) else: return super().keypress(size, key) def apply_choice(self, name, value): self.apply_changes(name, value) for k,v in self.data.items(): if k == name or v: continue options = self.query_manager.unique_suggestions(k, **self.data) if len(options) == 1: self.apply_changes(k, list(options)[0]) def apply_changes(self, name, value): self.data = { name: value if name != 'organic' else { 'yes': True, 'no': False, True: True, False: False, 'mixed': '', }[value], } @property def data(self): ret = dict(itertools.chain( [(k, v.get_edit_text()) for k,v in self.edit_fields.items()], [(k, v.state) for k,v in self.checkboxes.items()] )) return ret @data.setter def data(self, _data: dict): for k,v in _data.items(): if k in self.edit_fields and v != self.edit_fields[k].get_edit_text(): self.edit_fields[k].set_edit_text(v) if k in self.checkboxes and v != self.checkboxes[k].state: self.checkboxes[k].set_state(v) def clear(self): for (_, ef) in self.edit_fields.items(): ef.set_edit_text('') for (_, cb) in self.checkboxes.items(): cb.set_state('mixed') for (_, tf) in self.text_fields.items(): tf.set_text('') self.update() return self def update(self): self.update_historic_prices(self.data) return self def update_historic_prices(self, data): organic = None if data['organic'] == 'mixed' else data['organic'] sort = '$/unit' if self.buttons['sort_price'].state else 'ts' product, unit = data['product'] or None, data['unit'] or None try: price = Decimal(data['price']) except InvalidOperation: price = None try: quantity = Decimal(data['quantity']) except InvalidOperation: quantity = None if None in (sort, product, unit): self.text_fields['dbview'].set_text('') self.rating.update_rating(None, None, None, unit) return df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic) if df.empty: self.text_fields['dbview'].set_text('') self.rating.update_rating(None, None, None, unit) return assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}" _avg, _min, _max = [ float(x) for x in df[['avg','min','max']].iloc[0] ] self.rating.update_rating(_avg, _min, _max, unit, price=price, quantity=quantity), self.text_fields['dbview'].set_text( self.rating.get_historic_prices(df) ) def __init__(self, activity_manager: ActivityManager, query_manager: QueryManager, autocomplete_cb: Callable[[ Union[AutoCompleteEdit, AutoCompleteFloatEdit], str, dict ], None], ): button_group = [] self.buttons = { 'clear': Button(('streak', 'Clear')), 'exit': Button(('streak', 'Exit')), 'sort_price': RadioButton(button_group, ('streak', 'Best'), state="first True"), 'sort_date': RadioButton(button_group, ('streak', 'Last'), state="first True"), } self.edit_fields = { 'product': AutoCompleteEdit(('bg', 'product')), 'unit': AutoCompleteEdit(('bg', 'unit')), 'quantity': AutoCompleteFloatEdit(('bg', 'quantity')), 'price': AutoCompleteFloatEdit(('bg', 'price')), } self.checkboxes = { 'organic': NoTabCheckBox(('bg', "Organic"), state='mixed'), } self.text_fields = dict(( (k, Text('')) for k in ('dbview', 'spread', 'rating', 'marker') )) self.rating = Rating(dict(filter( lambda x: x[0] in ('spread','rating','marker'), self.text_fields.items() ))) top_pane = [ 'clear', 'exit', ['sort_price', 'sort_date'], ] left_pane = [ 'product', 'organic', ] badge = [ 'rating', 'spread', 'marker', ] right_pane = [ 'unit', 'quantity', 'price', ] bottom_pane = [ 'dbview', ] self.query_manager = query_manager self.organic_checkbox = self.checkboxes['organic'] connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update()) for (k, ef) in self.edit_fields.items(): connect_signal(ef, 'postchange', lambda _,v: self.update()) connect_signal(ef, 'apply', lambda w, name: autocomplete_cb(w, name, self.data)) for b in button_group: connect_signal(b, 'postchange',lambda *_: self.update()) connect_signal(self.buttons['clear'], 'click', lambda x: self.clear().update()) connect_signal(self.buttons['exit'], 'click', lambda x: show_or_exit('esc')) self.clear() header = Text(u'Price Check', 'center') _copyright = Text(COPYRIGHT, 'center') banner = Pile([ Padding(header, 'center', width=('relative', 100)), Padding(_copyright, 'center', width=('relative', 100)), ]) banner = AttrMap(banner, 'banner') _widgets = dict(itertools.chain(*[ [(k, v) for k,v in x] for x in map(lambda x: x.items(), [ self.edit_fields, self.text_fields, self.checkboxes ]) ])) _widgets.update([ (k, LineBox(AttrMap( AutoCompletePopUp( self.edit_fields[k], self.apply_choice, lambda: activity_manager.show(self.update()) ), 'streak'), title=k.title(), title_align='left') ) for k in self.edit_fields ]) _widgets.update({ 'dbview': LineBox( AttrMap(self.text_fields['dbview'], 'streak'), title="Historic Prices", title_align='center', ), }) components = { 'top_pane': Columns([ (9, Pile([ Divider(), AttrMap(self.buttons['clear'], 'streak'), Divider(), ])), LineBox( Columns([ v for k,v in self.buttons.items() if 'sort' in k]), title="Sort price by", title_align='left', ), (9, Pile([ Divider(), AttrMap(self.buttons['exit'], 'streak'), Divider(), ])) ], dividechars=1), 'right_pane': (16, Pile(map( lambda x: _widgets[x] if x is not None else Divider, right_pane ))), 'left_pane': Pile(map( lambda x: _widgets[x] if x is not None else Divider, left_pane )), 'badge': Pile(map( lambda x: _widgets[x] if x is not None else Divider, badge )), 'bottom_pane': _widgets['dbview'], } components.update({ 'left_pane': Pile([ components['left_pane'], LineBox( AttrMap(components['badge'], 'badge'), title="Current Price", title_align='left', ) ])}) widget = Pile([ banner, Divider(), components['top_pane'], Columns((components['left_pane'], components['right_pane']), dividechars=0, ), components['bottom_pane'], ]) widget = Filler(widget, 'top') widget = AttrMap(widget, 'bg') widget.original_widget.original_widget.set_focus_path([3,0,0]) super().__init__(widget, 4, [3,0,0,0], [ [0, 0,], [0,1,], [1,], [2,], [2,0,], [2,0,0,], [2,0,2], [2,2,0], [2,2,2], [3,0,], [3,0,0,], [3,0,1,0,], [3,0,1,1,], [3,0,1,2,], [3,1,3,], [4,], ] )