# # Copyright (c) Daniel Sheffield 2021 - 2023 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY import itertools from decimal import Decimal, InvalidOperation from itertools import chain from typing import Callable, Union from urwid import ( connect_signal, AttrMap, Button, Columns, Divider, Filler, LineBox, Padding, Pile, Text, ) from .. import COPYRIGHT from ..db_utils import QueryManager from ..widgets import ( AutoCompleteEdit, AutoCompleteFloatEdit, FocusWidget, AutoCompletePopUp, NoTabCheckBox, FlowBarGraphWithVScale, ) from . import ActivityManager from .Rating import Rating from .NewProduct import NewProduct class TransactionEditor(FocusWidget): def keypress(self, size, key): if isinstance(key, tuple): return if getattr(self._w.original_widget, 'original_widget', None) is None: return super().keypress(size, key) if key == 'tab': self.advance_focus() elif key == 'shift tab': self.advance_focus(reverse=True) elif key == 'ctrl delete': self.clear() self.focus_on(self.edit_fields['product']) elif key == 'insert': self.save_and_clear_cb() elif key == 'ctrl p': self.focus_on(self.edit_fields['product']) else: return super().keypress(size, key) def apply_choice(self, name, value): self.apply_changes(name, value) for k,v in self.data.items(): if k == name or v: continue options = self.query_manager.unique_suggestions(k, **self.data) if len(options) == 1 and k != 'ts': self.apply_changes(k, list(options)[0]) def apply_changes(self, name, value): self.data = { name: value if name != 'organic' else { 'yes': True, 'no': False, True: True, False: False, 'mixed': '', }[value], } @property def data(self): ret = dict(itertools.chain( [(k, v.get_edit_text()) for k,v in self.edit_fields.items()], [(k, v.state) for k,v in self.checkboxes.items()] )) return ret @data.setter def data(self, _data: dict): for k,v in _data.items(): if k in self.edit_fields and v != self.edit_fields[k].get_edit_text(): self.edit_fields[k].set_edit_text(v) if k in self.checkboxes and v != self.checkboxes[k].state: self.checkboxes[k].set_state(v) def clear(self): for (k, ef) in self.edit_fields.items(): if k in ('ts', 'store',): continue ef.set_edit_text('') for (_, cb) in self.checkboxes.items(): cb.set_state('mixed') for (k, tf) in self.text_fields.items(): if k != 'dbview': tf.set_text('') self.graph.set_data([],0) return self.update() def update(self): data = self.data date, store = data['ts'], data['store'] self.text_fields['dbview'].set_text( self.query_manager.get_session_transactions(date, store) if None not in ( date or None, store or None ) else '' ) self.update_historic_prices(data) return self def update_graph(self, df): # after truncating, need to recalculate avg(median), min, max df = df.sort_values( 'ts_raw', ascending=True, ignore_index=True ).truncate( before=max(0, len(df.index)-self.graph._canvas_width) ) data = df[['$/unit','quantity']].apply( lambda x: (float(x['$/unit']), float(x['quantity'])), axis=1, result_type='broadcast' ) data['avg'] = (data['$/unit']*data['quantity']).sum()/data['quantity'].sum() data_max = data.max()['$/unit'] #.max() assert len(data['avg'].unique()) == 1 norm = [ (x,) for x in data['$/unit'] ] #.to_records(index=False) self.graph.set_data(norm, data_max, vscale=[x for x in map(float, [ data['$/unit'].min(), data['$/unit'].median(), data['avg'].iloc[0], data_max ])] ) #self.graph.set_bar_width(1) # canvas_width = 10 + pad + pad + 10 date_strlen = (self.graph.canvas_width - 20) ex = "─" if date_strlen % 2 else "" plen = date_strlen//2 caption = f"{df['ts_raw'].min():%d/%m/%Y}" caption += f"{{p:>{plen}}}{ex}{{p:<{plen}}}".format( p="─") caption += f"{df['ts_raw'].max():%d/%m/%Y}" self.graph.set_caption(caption) def update_historic_prices(self, data): organic = None if data['organic'] == 'mixed' else data['organic'] #sort = '$/unit' if self.buttons['sort_price'].state else 'ts' product, unit = data['product'] or None, data['unit'] or None try: price = Decimal(data['price']) except InvalidOperation: price = None try: quantity = Decimal(data['quantity']) except InvalidOperation: quantity = None if None in (product, unit): self.rating.update_rating(None, None, None, unit) return df = self.query_manager.get_historic_prices_data(unit, product=product, organic=organic, sort='ts') if df.empty: self.rating.update_rating(None, None, None, unit) return assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}" # all time (or all data) avg(mean), min, max _avg, _min, _max = [ float(x) for x in df[['avg','min','max']].iloc[0] ] self.rating.update_rating(_avg, _min, _max, unit, price=price, quantity=quantity) self.update_graph(df) def autocomplete_callback(self, widget, name, options): if len(options): widget._emit('open', options) return new_product_activity = self.activity_manager.get('new_product') current_activity = self.activity_manager.current() if current_activity is not new_product_activity and name in ( 'product', 'category', 'group' ): self.new_product_callback(name) return def new_product_callback(self, name): cur = self.activity_manager.current() txn = self.activity_manager.get('transaction') new_product = self.activity_manager.create( NewProduct, 'new_product', self.activity_manager, self.query_manager, cur, name, txn.data, txn.apply_changes, ) self.activity_manager.show(new_product) def save_and_clear_callback(self): txn = self.activity_manager.get('transaction') self.activity_manager.app.save(txn.data) txn.clear() txn.focus_on(txn.edit_fields['product']) self.activity_manager.show(txn.update()) def __init__(self, activity_manager: ActivityManager, query_manager: QueryManager, ): self.activity_manager = activity_manager self.query_manager = query_manager self.buttons = { 'done': Button(('streak', u'Done')), 'clear': Button(('streak', u'Clear')), } self.edit_fields = { 'ts': AutoCompleteEdit(('bg', 'ts')), 'store': AutoCompleteEdit(('bg', 'store')), 'product': AutoCompleteEdit(('bg', 'product')), 'category': AutoCompleteEdit(('bg', 'category')), 'group': AutoCompleteEdit(('bg', 'group')), 'description': AutoCompleteEdit(('bg', 'description')), 'unit': AutoCompleteEdit(('bg', 'unit')), 'quantity': AutoCompleteFloatEdit(('bg', 'quantity')), 'price': AutoCompleteFloatEdit(('bg', 'price')), } self.checkboxes = { 'organic': NoTabCheckBox(('bg', "Organic"), state='mixed'), } self.text_fields = { 'dbview': Text(''), 'rating': Text(''), 'spread': Text(''), 'marker': Text(''), } self.graph = FlowBarGraphWithVScale( 50, 14, ['bg','popup_focus', 'badge_neutral' ], hatt=['dark red', 'dark red', 'dark red'] ) self.rating = Rating(dict(filter( lambda x: x[0] in ('spread','rating','marker'), self.text_fields.items() ))) self.organic_checkbox = self.checkboxes['organic'] connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update()) layout = [ [ 'ts', 'store', ], [ 'organic', 'product', ], [ 'category', 'group', ], ] side_pane = [ 'unit', 'quantity', 'price', ] bottom_pane = [ 'description', 'dbview', ] badge = [ 'rating', 'spread', 'marker', ] self.clear() _widgets = dict(itertools.chain(*[ [(k, v) for k,v in x] for x in map(lambda x: x.items(), [ self.edit_fields, self.text_fields, self.checkboxes ]) ])) _widgets.update({ 'dbview': LineBox( AttrMap(self.text_fields['dbview'], 'streak'), title="Session Data", title_align='left', ) }) for (k, ef) in self.edit_fields.items(): connect_signal(ef, 'postchange', lambda *_: self.update()) connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback( w, name, query_manager.unique_suggestions(name, **self.data) )) _widgets.update(dict([ (k, LineBox( AttrMap(AutoCompletePopUp( self.edit_fields[k], self.apply_choice, lambda: activity_manager.show(self.update()) ), 'streak'), title=k.title(), title_align='left') ) for k in self.edit_fields if k != 'product' ])) header = Text(u'Fill Transaction', 'center') _copyright = Text(COPYRIGHT, 'center') components = { 'bottom_button_bar': Columns( [(8, self.buttons['done']), Divider(), (9, self.buttons['clear'])] ), 'badge': Pile(map( lambda x: _widgets[x] if x is not None else Divider, badge )), } components.update({ 'bottom_pane': Columns([ Pile(map( lambda x: _widgets[x] if x is not None else Divider(), bottom_pane )), (self.graph.total_width+2, Pile([ LineBox( AttrMap(components['badge'], 'badge'), title="Current Price", title_align='left', ), LineBox( self.graph, title="Historic Price", title_align='left' ), ])), ]) }) connect_signal(self.buttons['done'], 'click', lambda _: self.save_and_clear_callback()) connect_signal(self.buttons['clear'], 'click', lambda _: self.clear()) banner = Pile([ Padding(header, 'center', width=('relative', 100)), Padding(_copyright, 'center', width=('relative', 100)), ]) banner = AttrMap(banner, 'banner') _widgets.update({ 'product': LineBox(Columns([ AttrMap(AutoCompletePopUp( self.edit_fields['product'], self.apply_choice, lambda: activity_manager.show(self.update()) ), 'streak'), self.organic_checkbox ], dividechars=2), title='Product', title_align='left') }) components['side_pane'] = (12, Pile([ _widgets[r] if r is not None else Divider() for r in side_pane ])) components['main_pane'] = [] for _, r in enumerate(layout): col = [] for c in r: if c is not None: if c == 'organic': continue col.append(_widgets[c]) else: col.append(Divider()) components['main_pane'].append(Columns(col)) components['main_pane'] = Pile(components['main_pane']) widget = Pile([ banner, Divider(), Columns([ components['main_pane'], components['side_pane'] ], dividechars=2), components['bottom_pane'], Divider(), components['bottom_button_bar'] ]) widget = Filler(widget, 'top') widget = AttrMap(widget, 'bg') super().__init__(widget, map( lambda x: next(w[1] for w in chain( self.buttons.items(), self.edit_fields.items(), self.checkboxes.items() ) if x == w[0]), [ 'product', 'organic', 'unit', 'quantity', 'price', 'description', 'done', 'clear', 'category', 'group', 'ts', 'store' ]))