PriceCheck.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #
  2. # Copyright (c) Daniel Sheffield 2021 - 2023
  3. #
  4. # All rights reserved
  5. #
  6. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  7. import itertools
  8. from decimal import Decimal, InvalidOperation
  9. from itertools import chain
  10. from typing import Callable, Union
  11. from urwid import (
  12. connect_signal,
  13. AttrMap,
  14. Button,
  15. Columns,
  16. Divider,
  17. Filler,
  18. LineBox,
  19. Padding,
  20. Pile,
  21. RadioButton,
  22. Text,
  23. )
  24. from .. import COPYRIGHT
  25. from ..widgets import (
  26. AutoCompleteEdit,
  27. AutoCompleteFloatEdit,
  28. FocusWidget,
  29. AutoCompletePopUp,
  30. NoTabCheckBox,
  31. FlowBarGraphWithVScale,
  32. )
  33. from ..data.QueryManager import QueryManager
  34. from . import ActivityManager, show_or_exit
  35. from .Rating import Rating
  36. class PriceCheck(FocusWidget):
  37. def keypress(self, size, key):
  38. if isinstance(key, tuple):
  39. return
  40. if getattr(self._w.original_widget, 'original_widget', None) is None:
  41. return super().keypress(size, key)
  42. if key == 'tab':
  43. self.advance_focus()
  44. elif key == 'shift tab':
  45. self.advance_focus(reverse=True)
  46. else:
  47. return super().keypress(size, key)
  48. def apply_choice(self, name, value):
  49. self.apply_changes(name, value)
  50. for k,v in self.data.items():
  51. if k == name or v:
  52. continue
  53. options = self.query_manager.unique_suggestions(k, **self.data)
  54. if len(options) == 1:
  55. self.apply_changes(k, list(options)[0])
  56. def apply_changes(self, name, value):
  57. self.data = {
  58. name: value if name != 'organic' else {
  59. 'yes': True, 'no': False,
  60. True: True, False: False,
  61. 'mixed': '',
  62. }[value],
  63. }
  64. @property
  65. def data(self):
  66. ret = dict(itertools.chain(
  67. [(k, v.get_edit_text()) for k,v in self.edit_fields.items()],
  68. [(k, v.state) for k,v in self.checkboxes.items()]
  69. ))
  70. return ret
  71. @data.setter
  72. def data(self, _data: dict):
  73. for k,v in _data.items():
  74. if k in self.edit_fields and v != self.edit_fields[k].get_edit_text():
  75. self.edit_fields[k].set_edit_text(v)
  76. if k in self.checkboxes and v != self.checkboxes[k].state:
  77. self.checkboxes[k].set_state(v)
  78. def clear(self):
  79. for (_, ef) in self.edit_fields.items():
  80. ef.set_edit_text('')
  81. for (_, cb) in self.checkboxes.items():
  82. cb.set_state('mixed')
  83. for (_, tf) in self.text_fields.items():
  84. tf.set_text('')
  85. self.graph.set_data([],0)
  86. return self.update()
  87. def update(self):
  88. self.update_historic_prices(self.data)
  89. return self
  90. def update_graph(self, df):
  91. # after truncating, need to recalculate avg(median), min, max
  92. df = df.sort_values(
  93. 'ts_raw', ascending=True, ignore_index=True
  94. ).truncate(
  95. before=max(0, len(df.index)-self.graph._canvas_width)
  96. )
  97. data = df[['$/unit','quantity']].apply(
  98. lambda x: (float(x['$/unit']), float(x['quantity'])),
  99. axis=1, result_type='broadcast'
  100. )
  101. data['avg'] = (data['$/unit']*data['quantity']).sum()/data['quantity'].sum()
  102. data_max = data.max()['$/unit'] #.max()
  103. assert len(data['avg'].unique()) == 1
  104. norm = [ (x,) for x in data['$/unit'] ] #.to_records(index=False)
  105. self.graph.set_data(norm, data_max,
  106. vscale=[x for x in map(float, [
  107. data['$/unit'].min(),
  108. data['$/unit'].median(),
  109. data['avg'].iloc[0],
  110. data_max
  111. ])]
  112. )
  113. #self.graph.set_bar_width(1)
  114. # canvas_width = 10 + pad + pad + 10
  115. date_strlen = (self.graph.canvas_width - 20)
  116. ex = "─" if date_strlen % 2 else ""
  117. plen = date_strlen//2
  118. caption = f"{df['ts_raw'].min():%d/%m/%Y}"
  119. caption += f"{{p:>{plen}}}{ex}{{p:<{plen}}}".format(
  120. p="─")
  121. caption += f"{df['ts_raw'].max():%d/%m/%Y}"
  122. self.graph.set_caption(caption)
  123. def update_historic_prices(self, data):
  124. organic = None if data['organic'] == 'mixed' else data['organic']
  125. sort = '$/unit' if self.buttons['sort_price'].state else 'ts'
  126. product, unit = data['product'] or None, data['unit'] or None
  127. try:
  128. price = Decimal(data['price'])
  129. except InvalidOperation:
  130. price = None
  131. try:
  132. quantity = Decimal(data['quantity'])
  133. except InvalidOperation:
  134. quantity = None
  135. if None in (sort, product, unit):
  136. self.text_fields['dbview'].set_text('')
  137. self.rating.update_rating(None, None, None, unit)
  138. return
  139. df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic).dropna()
  140. if df.empty:
  141. self.text_fields['dbview'].set_text('')
  142. self.rating.update_rating(None, None, None, unit)
  143. return
  144. assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}"
  145. _avg, _min, _max = [
  146. float(x) for x in df[['avg','min','max']].iloc[0]
  147. ]
  148. self.rating.update_rating(_avg, _min, _max, unit, price=price, quantity=quantity)
  149. self.text_fields['dbview'].set_text(
  150. self.rating.get_historic_prices(df)
  151. )
  152. self.update_graph(df)
  153. def __init__(self,
  154. activity_manager: ActivityManager,
  155. query_manager: QueryManager,
  156. ):
  157. button_group = []
  158. self.buttons = {
  159. 'clear': Button(('streak', 'Clear')),
  160. 'exit': Button(('streak', 'Exit')),
  161. 'sort_price': RadioButton(button_group, ('streak', 'Best'), state="first True"),
  162. 'sort_date': RadioButton(button_group, ('streak', 'Last'), state="first True"),
  163. }
  164. self.edit_fields = {
  165. 'product': AutoCompleteEdit(('bg', 'product')),
  166. 'unit': AutoCompleteEdit(('bg', 'unit')),
  167. 'quantity': AutoCompleteFloatEdit(('bg', 'quantity')),
  168. 'price': AutoCompleteFloatEdit(('bg', 'price')),
  169. }
  170. self.checkboxes = {
  171. 'organic': NoTabCheckBox(('bg', "Organic"), state='mixed'),
  172. }
  173. self.text_fields = dict((
  174. (k, Text('')) for k in ('dbview', 'spread', 'rating', 'marker')
  175. ))
  176. self.rating = Rating(dict(filter(
  177. lambda x: x[0] in ('spread','rating','marker'),
  178. self.text_fields.items()
  179. )))
  180. top_pane = [ 'clear', 'exit', ['sort_price', 'sort_date'], ]
  181. left_pane = [
  182. 'product',
  183. 'organic',
  184. ]
  185. badge = [
  186. 'rating',
  187. 'spread',
  188. 'marker',
  189. ]
  190. right_pane = [
  191. 'unit',
  192. 'quantity',
  193. 'price',
  194. ]
  195. bottom_pane = [ 'graph', 'dbview', ]
  196. self.query_manager = query_manager
  197. self.organic_checkbox = self.checkboxes['organic']
  198. connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update())
  199. self.autocomplete_callback = lambda widget, options: len(options) and widget._emit('open', options)
  200. for (k, ef) in self.edit_fields.items():
  201. connect_signal(ef, 'postchange', lambda _,v: self.update())
  202. connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback(
  203. w, query_manager.unique_suggestions(name, **self.data)
  204. ))
  205. for b in button_group:
  206. connect_signal(b, 'postchange',lambda *_: self.update())
  207. connect_signal(self.buttons['clear'], 'click', lambda x: self.clear().update())
  208. connect_signal(self.buttons['exit'], 'click', lambda x: show_or_exit('esc'))
  209. self.graph = FlowBarGraphWithVScale(
  210. 50, 14,
  211. ['bg','popup_focus', 'badge_neutral' ],
  212. hatt=['dark red', 'dark red', 'dark red']
  213. )
  214. self.clear()
  215. header = Text(u'Price Check', 'center')
  216. _copyright = Text(COPYRIGHT, 'center')
  217. banner = Pile([
  218. Padding(header, 'center', width=('relative', 100)),
  219. Padding(_copyright, 'center', width=('relative', 100)),
  220. ])
  221. banner = AttrMap(banner, 'banner')
  222. _widgets = dict(itertools.chain(*[
  223. [(k, v) for k,v in x] for x in map(lambda x: x.items(), [
  224. self.edit_fields, self.text_fields, self.checkboxes
  225. ])
  226. ]))
  227. _widgets.update([
  228. (k, LineBox(AttrMap(
  229. AutoCompletePopUp(
  230. self.edit_fields[k],
  231. self.apply_choice,
  232. lambda: activity_manager.show(self.update())
  233. ), 'streak'), title=k.title(), title_align='left')
  234. ) for k in self.edit_fields
  235. ])
  236. _widgets.update({
  237. 'dbview': LineBox(
  238. AttrMap(self.text_fields['dbview'], 'streak'),
  239. title="Historic Prices",
  240. title_align='center',
  241. ),
  242. })
  243. components = {
  244. 'top_pane': Columns([
  245. (9, Pile([
  246. Divider(),
  247. AttrMap(self.buttons['clear'], 'streak'),
  248. Divider(),
  249. ])),
  250. LineBox(
  251. Columns([ v for k,v in self.buttons.items() if 'sort' in k]),
  252. title="Sort price by",
  253. title_align='left',
  254. ),
  255. (9, Pile([
  256. Divider(),
  257. AttrMap(self.buttons['exit'], 'streak'),
  258. Divider(),
  259. ]))
  260. ], dividechars=1),
  261. 'right_pane': (16, Pile(map(
  262. lambda x: _widgets[x] if x is not None else Divider,
  263. right_pane
  264. ))),
  265. 'left_pane': Pile(map(
  266. lambda x: _widgets[x] if x is not None else Divider,
  267. left_pane
  268. )),
  269. 'badge': Pile(map(
  270. lambda x: _widgets[x] if x is not None else Divider,
  271. badge
  272. )),
  273. }
  274. _widgets.update({
  275. 'graph': LineBox(
  276. self.graph,
  277. title="Historic Price", title_align='left'
  278. ),
  279. })
  280. components.update({
  281. 'bottom_pane': [ _widgets['graph'], _widgets['dbview'] ],
  282. })
  283. components.update({
  284. 'left_pane': Pile([
  285. components['left_pane'],
  286. LineBox(
  287. AttrMap(components['badge'], 'badge'),
  288. title="Current Price", title_align='left',
  289. )
  290. ])})
  291. widget = Pile([
  292. banner,
  293. Divider(),
  294. components['top_pane'],
  295. Columns((components['left_pane'], components['right_pane']),
  296. dividechars=0,
  297. ),
  298. Pile(components['bottom_pane']),
  299. ])
  300. widget = Filler(widget, 'top')
  301. widget = AttrMap(widget, 'bg')
  302. super().__init__(widget, map(
  303. lambda x: next(w[1] for w in chain(
  304. self.buttons.items(),
  305. self.edit_fields.items(),
  306. self.checkboxes.items(),
  307. ) if x == w[0]),
  308. [
  309. 'product', 'organic', 'unit', 'quantity', 'price',
  310. 'clear', 'exit', 'sort_price', 'sort_date',
  311. ]
  312. ))