123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- import itertools
- from decimal import Decimal, InvalidOperation
- from itertools import chain
- from typing import Callable, Union
- from urwid import (
- connect_signal,
- AttrMap,
- Button,
- Columns,
- Divider,
- Filler,
- LineBox,
- Padding,
- Pile,
- RadioButton,
- Text,
- )
- from .. import COPYRIGHT
- from ..widgets import (
- AutoCompleteEdit,
- AutoCompleteFloatEdit,
- FocusWidget,
- AutoCompletePopUp,
- NoTabCheckBox,
- FlowBarGraphWithVScale,
- )
- from ..data.QueryManager import QueryManager
- from . import ActivityManager, show_or_exit
- from .Rating import Rating
- class PriceCheck(FocusWidget):
- def keypress(self, size, key):
- if isinstance(key, tuple):
- return
- if getattr(self._w.original_widget, 'original_widget', None) is None:
- return super().keypress(size, key)
- if key == 'tab':
- self.advance_focus()
- elif key == 'shift tab':
- self.advance_focus(reverse=True)
- else:
- return super().keypress(size, key)
- def apply_choice(self, name, value):
- self.apply_changes(name, value)
- for k,v in self.data.items():
- if k == name or v:
- continue
- options = self.query_manager.unique_suggestions(k, **self.data)
- if len(options) == 1:
- self.apply_changes(k, list(options)[0])
- def apply_changes(self, name, value):
- self.data = {
- name: value if name != 'organic' else {
- 'yes': True, 'no': False,
- True: True, False: False,
- 'mixed': '',
- }[value],
- }
- @property
- def data(self):
- ret = dict(itertools.chain(
- [(k, v.get_edit_text()) for k,v in self.edit_fields.items()],
- [(k, v.state) for k,v in self.checkboxes.items()]
- ))
- return ret
- @data.setter
- def data(self, _data: dict):
- for k,v in _data.items():
- if k in self.edit_fields and v != self.edit_fields[k].get_edit_text():
- self.edit_fields[k].set_edit_text(v)
- if k in self.checkboxes and v != self.checkboxes[k].state:
- self.checkboxes[k].set_state(v)
- def clear(self):
- for (_, ef) in self.edit_fields.items():
- ef.set_edit_text('')
- for (_, cb) in self.checkboxes.items():
- cb.set_state('mixed')
- for (_, tf) in self.text_fields.items():
- tf.set_text('')
- self.graph.set_data([],0)
- return self.update()
- def update(self):
- self.update_historic_prices(self.data)
- return self
- def update_graph(self, df):
-
- df = df.sort_values(
- 'ts_raw', ascending=True, ignore_index=True
- ).truncate(
- before=max(0, len(df.index)-self.graph._canvas_width)
- )
- data = df[['$/unit','quantity']].apply(
- lambda x: (float(x['$/unit']), float(x['quantity'])),
- axis=1, result_type='broadcast'
- )
- data['avg'] = (data['$/unit']*data['quantity']).sum()/data['quantity'].sum()
- data_max = data.max()['$/unit']
- assert len(data['avg'].unique()) == 1
- norm = [ (x,) for x in data['$/unit'] ]
- self.graph.set_data(norm, data_max,
- vscale=[x for x in map(float, [
- data['$/unit'].min(),
- data['$/unit'].median(),
- data['avg'].iloc[0],
- data_max
- ])]
- )
-
-
- date_strlen = (self.graph.canvas_width - 20)
- ex = "─" if date_strlen % 2 else ""
- plen = date_strlen//2
- caption = f"{df['ts_raw'].min():%d/%m/%Y}"
- caption += f"{{p:>{plen}}}{ex}{{p:<{plen}}}".format(
- p="─")
- caption += f"{df['ts_raw'].max():%d/%m/%Y}"
- self.graph.set_caption(caption)
- def update_historic_prices(self, data):
- organic = None if data['organic'] == 'mixed' else data['organic']
- sort = '$/unit' if self.buttons['sort_price'].state else 'ts'
- product, unit = data['product'] or None, data['unit'] or None
- try:
- price = Decimal(data['price'])
- except InvalidOperation:
- price = None
- try:
- quantity = Decimal(data['quantity'])
- except InvalidOperation:
- quantity = None
- if None in (sort, product, unit):
- self.text_fields['dbview'].set_text('')
- self.rating.update_rating(None, None, None, unit)
- return
- df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic).dropna()
- if df.empty:
- self.text_fields['dbview'].set_text('')
- self.rating.update_rating(None, None, None, unit)
- return
- assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}"
- _avg, _min, _max = [
- float(x) for x in df[['avg','min','max']].iloc[0]
- ]
- self.rating.update_rating(_avg, _min, _max, unit, price=price, quantity=quantity)
- self.text_fields['dbview'].set_text(
- self.rating.get_historic_prices(df)
- )
- self.update_graph(df)
- def __init__(self,
- activity_manager: ActivityManager,
- query_manager: QueryManager,
- ):
- button_group = []
- self.buttons = {
- 'clear': Button(('streak', 'Clear')),
- 'exit': Button(('streak', 'Exit')),
- 'sort_price': RadioButton(button_group, ('streak', 'Best'), state="first True"),
- 'sort_date': RadioButton(button_group, ('streak', 'Last'), state="first True"),
- }
- self.edit_fields = {
- 'product': AutoCompleteEdit(('bg', 'product')),
- 'unit': AutoCompleteEdit(('bg', 'unit')),
- 'quantity': AutoCompleteFloatEdit(('bg', 'quantity')),
- 'price': AutoCompleteFloatEdit(('bg', 'price')),
- }
- self.checkboxes = {
- 'organic': NoTabCheckBox(('bg', "Organic"), state='mixed'),
- }
- self.text_fields = dict((
- (k, Text('')) for k in ('dbview', 'spread', 'rating', 'marker')
- ))
- self.rating = Rating(dict(filter(
- lambda x: x[0] in ('spread','rating','marker'),
- self.text_fields.items()
- )))
- top_pane = [ 'clear', 'exit', ['sort_price', 'sort_date'], ]
- left_pane = [
- 'product',
- 'organic',
- ]
- badge = [
- 'rating',
- 'spread',
- 'marker',
- ]
- right_pane = [
- 'unit',
- 'quantity',
- 'price',
- ]
- bottom_pane = [ 'graph', 'dbview', ]
- self.query_manager = query_manager
- self.organic_checkbox = self.checkboxes['organic']
- connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update())
- self.autocomplete_callback = lambda widget, options: len(options) and widget._emit('open', options)
- for (k, ef) in self.edit_fields.items():
- connect_signal(ef, 'postchange', lambda _,v: self.update())
- connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback(
- w, query_manager.unique_suggestions(name, **self.data)
- ))
- for b in button_group:
- connect_signal(b, 'postchange',lambda *_: self.update())
- connect_signal(self.buttons['clear'], 'click', lambda x: self.clear().update())
- connect_signal(self.buttons['exit'], 'click', lambda x: show_or_exit('esc'))
- self.graph = FlowBarGraphWithVScale(
- 50, 14,
- ['bg','popup_focus', 'badge_neutral' ],
- hatt=['dark red', 'dark red', 'dark red']
- )
- self.clear()
- header = Text(u'Price Check', 'center')
- _copyright = Text(COPYRIGHT, 'center')
- banner = Pile([
- Padding(header, 'center', width=('relative', 100)),
- Padding(_copyright, 'center', width=('relative', 100)),
- ])
- banner = AttrMap(banner, 'banner')
- _widgets = dict(itertools.chain(*[
- [(k, v) for k,v in x] for x in map(lambda x: x.items(), [
- self.edit_fields, self.text_fields, self.checkboxes
- ])
- ]))
- _widgets.update([
- (k, LineBox(AttrMap(
- AutoCompletePopUp(
- self.edit_fields[k],
- self.apply_choice,
- lambda: activity_manager.show(self.update())
- ), 'streak'), title=k.title(), title_align='left')
- ) for k in self.edit_fields
- ])
- _widgets.update({
- 'dbview': LineBox(
- AttrMap(self.text_fields['dbview'], 'streak'),
- title="Historic Prices",
- title_align='center',
- ),
- })
- components = {
- 'top_pane': Columns([
- (9, Pile([
- Divider(),
- AttrMap(self.buttons['clear'], 'streak'),
- Divider(),
- ])),
- LineBox(
- Columns([ v for k,v in self.buttons.items() if 'sort' in k]),
- title="Sort price by",
- title_align='left',
- ),
- (9, Pile([
- Divider(),
- AttrMap(self.buttons['exit'], 'streak'),
- Divider(),
- ]))
- ], dividechars=1),
- 'right_pane': (16, Pile(map(
- lambda x: _widgets[x] if x is not None else Divider,
- right_pane
- ))),
- 'left_pane': Pile(map(
- lambda x: _widgets[x] if x is not None else Divider,
- left_pane
- )),
- 'badge': Pile(map(
- lambda x: _widgets[x] if x is not None else Divider,
- badge
- )),
- }
- _widgets.update({
- 'graph': LineBox(
- self.graph,
- title="Historic Price", title_align='left'
- ),
- })
- components.update({
- 'bottom_pane': [ _widgets['graph'], _widgets['dbview'] ],
- })
- components.update({
- 'left_pane': Pile([
- components['left_pane'],
- LineBox(
- AttrMap(components['badge'], 'badge'),
- title="Current Price", title_align='left',
- )
- ])})
- widget = Pile([
- banner,
- Divider(),
- components['top_pane'],
- Columns((components['left_pane'], components['right_pane']),
- dividechars=0,
- ),
- Pile(components['bottom_pane']),
- ])
- widget = Filler(widget, 'top')
- widget = AttrMap(widget, 'bg')
- super().__init__(widget, map(
- lambda x: next(w[1] for w in chain(
- self.buttons.items(),
- self.edit_fields.items(),
- self.checkboxes.items(),
- ) if x == w[0]),
- [
- 'product', 'organic', 'unit', 'quantity', 'price',
- 'clear', 'exit', 'sort_price', 'sort_date',
- ]
- ))
|