123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- import itertools
- from decimal import Decimal, InvalidOperation
- from itertools import chain
- from typing import Callable, Union
- from urwid import (
- connect_signal,
- AttrMap,
- Button,
- Columns,
- Divider,
- Filler,
- LineBox,
- Padding,
- Pile,
- Text,
- )
- from .. import COPYRIGHT
- from ..db_utils import QueryManager
- from ..widgets import (
- AutoCompleteEdit,
- AutoCompleteFloatEdit,
- FocusWidget,
- AutoCompletePopUp,
- NoTabCheckBox,
- FlowBarGraphWithVScale,
- )
- from . import ActivityManager
- from .Rating import Rating
- from .NewProduct import NewProduct
- class TransactionEditor(FocusWidget):
- def keypress(self, size, key):
- if isinstance(key, tuple):
- return
- if getattr(self._w.original_widget, 'original_widget', None) is None:
- return super().keypress(size, key)
- if key == 'tab':
- self.advance_focus()
- elif key == 'shift tab':
- self.advance_focus(reverse=True)
- elif key == 'ctrl delete':
- self.clear()
- self.focus_on(self.edit_fields['product'])
- elif key == 'insert':
- self.save_and_clear_cb()
- elif key == 'ctrl p':
- self.focus_on(self.edit_fields['product'])
- else:
- return super().keypress(size, key)
- def apply_choice(self, name, value):
- self.apply_changes(name, value)
- for k,v in self.data.items():
- if k == name or v:
- continue
- options = self.query_manager.unique_suggestions(k, **self.data)
- if len(options) == 1 and k != 'ts':
- self.apply_changes(k, list(options)[0])
- def apply_changes(self, name, value):
- self.data = {
- name: value if name != 'organic' else {
- 'yes': True, 'no': False,
- True: True, False: False,
- 'mixed': '',
- }[value],
- }
- @property
- def data(self):
- ret = dict(itertools.chain(
- [(k, v.get_edit_text()) for k,v in self.edit_fields.items()],
- [(k, v.state) for k,v in self.checkboxes.items()]
- ))
- return ret
- @data.setter
- def data(self, _data: dict):
- for k,v in _data.items():
- if k in self.edit_fields and v != self.edit_fields[k].get_edit_text():
- self.edit_fields[k].set_edit_text(v)
- if k in self.checkboxes and v != self.checkboxes[k].state:
- self.checkboxes[k].set_state(v)
- def clear(self):
- for (k, ef) in self.edit_fields.items():
- if k in ('ts', 'store',):
- continue
- ef.set_edit_text('')
- for (_, cb) in self.checkboxes.items():
- cb.set_state('mixed')
- for (k, tf) in self.text_fields.items():
- if k != 'dbview':
- tf.set_text('')
- self.graph.set_data([],0)
- return self.update()
- def update(self):
- data = self.data
- date, store = data['ts'], data['store']
- self.text_fields['dbview'].set_text(
- self.query_manager.get_session_transactions(date, store) if None not in (
- date or None, store or None
- ) else ''
- )
- self.update_historic_prices(data)
- return self
- def update_graph(self, df):
-
- df = df.sort_values(
- 'ts_raw', ascending=True, ignore_index=True
- ).truncate(
- before=max(0, len(df.index)-self.graph._canvas_width)
- )
- data = df[['$/unit','quantity']].apply(
- lambda x: (float(x['$/unit']), float(x['quantity'])),
- axis=1, result_type='broadcast'
- )
- data['avg'] = (data['$/unit']*data['quantity']).sum()/data['quantity'].sum()
- data_max = data.max()['$/unit']
- assert len(data['avg'].unique()) == 1
- norm = [ (x,) for x in data['$/unit'] ]
- self.graph.set_data(norm, data_max,
- vscale=[x for x in map(float, [
- data['$/unit'].min(),
- data['$/unit'].median(),
- data['avg'].iloc[0],
- data_max
- ])]
- )
-
-
- date_strlen = (self.graph.canvas_width - 20)
- ex = "─" if date_strlen % 2 else ""
- plen = date_strlen//2
- caption = f"{df['ts_raw'].min():%d/%m/%Y}"
- caption += f"{{p:>{plen}}}{ex}{{p:<{plen}}}".format(
- p="─")
- caption += f"{df['ts_raw'].max():%d/%m/%Y}"
- self.graph.set_caption(caption)
- def update_historic_prices(self, data):
- organic = None if data['organic'] == 'mixed' else data['organic']
-
- product, unit = data['product'] or None, data['unit'] or None
- try:
- price = Decimal(data['price'])
- except InvalidOperation:
- price = None
- try:
- quantity = Decimal(data['quantity'])
- except InvalidOperation:
- quantity = None
- if None in (product, unit):
- self.rating.update_rating(None, None, None, unit)
- return
- df = self.query_manager.get_historic_prices_data(unit, product=product, organic=organic, sort='ts')
- if df.empty:
- self.rating.update_rating(None, None, None, unit)
- return
- assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}"
-
- _avg, _min, _max = [
- float(x) for x in df[['avg','min','max']].iloc[0]
- ]
- self.rating.update_rating(_avg, _min, _max, unit, price=price, quantity=quantity)
- self.update_graph(df)
- def autocomplete_callback(self, widget, name, options):
- if len(options):
- widget._emit('open', options)
- return
-
- new_product_activity = self.activity_manager.get('new_product')
- current_activity = self.activity_manager.current()
- if current_activity is not new_product_activity and name in (
- 'product', 'category', 'group'
- ):
- self.new_product_callback(name)
- return
-
- def new_product_callback(self, name):
- cur = self.activity_manager.current()
- txn = self.activity_manager.get('transaction')
- new_product = self.activity_manager.create(
- NewProduct, 'new_product',
- self.activity_manager, self.query_manager,
- cur, name, txn.data,
- txn.apply_changes,
- )
- self.activity_manager.show(new_product)
- def save_and_clear_callback(self):
- txn = self.activity_manager.get('transaction')
- self.activity_manager.app.save(txn.data)
- txn.clear()
- txn.focus_on(txn.edit_fields['product'])
- self.activity_manager.show(txn.update())
- def __init__(self,
- activity_manager: ActivityManager,
- query_manager: QueryManager,
- ):
- self.activity_manager = activity_manager
- self.query_manager = query_manager
- self.buttons = {
- 'done': Button(('streak', u'Done')),
- 'clear': Button(('streak', u'Clear')),
- }
- self.edit_fields = {
- 'ts': AutoCompleteEdit(('bg', 'ts')),
- 'store': AutoCompleteEdit(('bg', 'store')),
- 'product': AutoCompleteEdit(('bg', 'product')),
- 'category': AutoCompleteEdit(('bg', 'category')),
- 'group': AutoCompleteEdit(('bg', 'group')),
- 'description': AutoCompleteEdit(('bg', 'description')),
- 'unit': AutoCompleteEdit(('bg', 'unit')),
- 'quantity': AutoCompleteFloatEdit(('bg', 'quantity')),
- 'price': AutoCompleteFloatEdit(('bg', 'price')),
- }
- self.checkboxes = {
- 'organic': NoTabCheckBox(('bg', "Organic"), state='mixed'),
- }
- self.text_fields = {
- 'dbview': Text(''),
- 'rating': Text(''),
- 'spread': Text(''),
- 'marker': Text(''),
- }
- self.graph = FlowBarGraphWithVScale(
- 50, 14,
- ['bg','popup_focus', 'badge_neutral' ],
- hatt=['dark red', 'dark red', 'dark red']
- )
- self.rating = Rating(dict(filter(
- lambda x: x[0] in ('spread','rating','marker'),
- self.text_fields.items()
- )))
- self.organic_checkbox = self.checkboxes['organic']
- connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update())
- layout = [
- [ 'ts', 'store', ],
- [ 'organic', 'product', ],
- [ 'category', 'group', ],
- ]
- side_pane = [
- 'unit',
- 'quantity',
- 'price',
- ]
- bottom_pane = [
- 'description',
- 'dbview',
- ]
- badge = [
- 'rating',
- 'spread',
- 'marker',
- ]
- self.clear()
- _widgets = dict(itertools.chain(*[
- [(k, v) for k,v in x] for x in map(lambda x: x.items(), [
- self.edit_fields, self.text_fields, self.checkboxes
- ])
- ]))
- _widgets.update({
- 'dbview': LineBox(
- AttrMap(self.text_fields['dbview'], 'streak'),
- title="Session Data",
- title_align='left',
- )
- })
- for (k, ef) in self.edit_fields.items():
- connect_signal(ef, 'postchange', lambda *_: self.update())
- connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback(
- w, name, query_manager.unique_suggestions(name, **self.data)
- ))
- _widgets.update(dict([
- (k, LineBox(
- AttrMap(AutoCompletePopUp(
- self.edit_fields[k],
- self.apply_choice,
- lambda: activity_manager.show(self.update())
- ), 'streak'), title=k.title(), title_align='left')
- ) for k in self.edit_fields if k != 'product'
- ]))
- header = Text(u'Fill Transaction', 'center')
- _copyright = Text(COPYRIGHT, 'center')
- components = {
- 'bottom_button_bar': Columns(
- [(8, self.buttons['done']), Divider(), (9, self.buttons['clear'])]
- ),
- 'badge': Pile(map(
- lambda x: _widgets[x] if x is not None else Divider,
- badge
- )),
- }
- components.update({
- 'bottom_pane': Columns([
- Pile(map(
- lambda x: _widgets[x] if x is not None else Divider(),
- bottom_pane
- )),
- (self.graph.total_width+2, Pile([
- LineBox(
- AttrMap(components['badge'], 'badge'),
- title="Current Price", title_align='left',
- ),
- LineBox(
- self.graph,
- title="Historic Price", title_align='left'
- ),
- ])),
- ])
- })
- connect_signal(self.buttons['done'], 'click', lambda _: self.save_and_clear_callback())
- connect_signal(self.buttons['clear'], 'click', lambda _: self.clear())
- banner = Pile([
- Padding(header, 'center', width=('relative', 100)),
- Padding(_copyright, 'center', width=('relative', 100)),
- ])
- banner = AttrMap(banner, 'banner')
- _widgets.update({
- 'product': LineBox(Columns([
- AttrMap(AutoCompletePopUp(
- self.edit_fields['product'],
- self.apply_choice,
- lambda: activity_manager.show(self.update())
- ), 'streak'),
- self.organic_checkbox
- ], dividechars=2), title='Product', title_align='left')
- })
- components['side_pane'] = (12, Pile([
- _widgets[r] if r is not None else Divider() for r in side_pane
- ]))
- components['main_pane'] = []
- for _, r in enumerate(layout):
- col = []
- for c in r:
- if c is not None:
- if c == 'organic':
- continue
- col.append(_widgets[c])
- else:
- col.append(Divider())
- components['main_pane'].append(Columns(col))
- components['main_pane'] = Pile(components['main_pane'])
- widget = Pile([
- banner,
- Divider(),
- Columns([
- components['main_pane'], components['side_pane']
- ], dividechars=2),
- components['bottom_pane'],
- Divider(),
- components['bottom_button_bar']
- ])
- widget = Filler(widget, 'top')
- widget = AttrMap(widget, 'bg')
- super().__init__(widget, map(
- lambda x: next(w[1] for w in chain(
- self.buttons.items(),
- self.edit_fields.items(),
- self.checkboxes.items()
- ) if x == w[0]),
- [
- 'product', 'organic',
- 'unit', 'quantity', 'price',
- 'description',
- 'done', 'clear',
- 'category', 'group',
- 'ts', 'store'
- ]))
|