|
@@ -0,0 +1,344 @@
|
|
|
+#!/usr/bin/python3
|
|
|
+#
|
|
|
+# Copyright (c) Daniel Sheffield 2021
|
|
|
+#
|
|
|
+# All rights reserved
|
|
|
+#
|
|
|
+# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
|
|
|
+import time
|
|
|
+import itertools
|
|
|
+import sys
|
|
|
+import urwid
|
|
|
+import pandas as pd
|
|
|
+from widgets import COPYRIGHT
|
|
|
+from db_utils import QueryManager
|
|
|
+from dateutil.parser import parse as parse_time
|
|
|
+from widgets import (
|
|
|
+ NoTabCheckBox,
|
|
|
+ AutoCompleteEdit,
|
|
|
+ AutoCompleteFloatEdit,
|
|
|
+ SuggestionPopup,
|
|
|
+ ActivityManager,
|
|
|
+ #_set_focus_path,
|
|
|
+)
|
|
|
+
|
|
|
+from collections import (
|
|
|
+ OrderedDict,
|
|
|
+)
|
|
|
+
|
|
|
+try:
|
|
|
+ from db_credentials import HOST, PASSWORD
|
|
|
+ host = f'host={HOST}'
|
|
|
+ password = f'password={PASSWORD}'
|
|
|
+except:
|
|
|
+ host = ''
|
|
|
+ password = ''
|
|
|
+
|
|
|
+try:
|
|
|
+ import psycopg2
|
|
|
+ from psycopg2.sql import SQL
|
|
|
+ import os
|
|
|
+ user = os.getenv('USER')
|
|
|
+ conn = psycopg2.connect(f"{host} dbname=das user={user} {password}")
|
|
|
+ cur = conn.cursor()
|
|
|
+except:
|
|
|
+ print('Failed to set up db connection. Entering Mock mode')
|
|
|
+ exit(1)
|
|
|
+ #from mock import *
|
|
|
+
|
|
|
+palette = [
|
|
|
+ ('banner', 'light gray', 'dark red'),
|
|
|
+ ('streak', 'light red', 'dark gray'),
|
|
|
+ ('bg', 'light red', 'black'),
|
|
|
+]
|
|
|
+
|
|
|
+top_pane = [
|
|
|
+ 'clear',
|
|
|
+ 'exit',
|
|
|
+ 'sort',
|
|
|
+]
|
|
|
+
|
|
|
+left_pane = [
|
|
|
+ 'product',
|
|
|
+ 'organic',
|
|
|
+ None,
|
|
|
+]
|
|
|
+right_pane = [
|
|
|
+ 'unit',
|
|
|
+ 'quantity',
|
|
|
+ 'price',
|
|
|
+]
|
|
|
+bottom_pane = [
|
|
|
+ #'rating',
|
|
|
+ 'dbview',
|
|
|
+]
|
|
|
+
|
|
|
+inputs = filter(
|
|
|
+ lambda x: x is not None,
|
|
|
+ itertools.chain(
|
|
|
+ left_pane, right_pane
|
|
|
+ )
|
|
|
+)
|
|
|
+inputs_layout = [
|
|
|
+ ['product', 'unit', ],
|
|
|
+ ['organic', 'quantity', ],
|
|
|
+ ['price', None, ],
|
|
|
+]
|
|
|
+
|
|
|
+display_map = {
|
|
|
+ 'price': lambda x: f"{x:.4f}",
|
|
|
+ 'quantity': lambda x: f"{x:.2f}",
|
|
|
+ 'organic': lambda x: "true" if x else "false",
|
|
|
+}
|
|
|
+display = lambda data, name: display_map[name](data) if name in display_map else data
|
|
|
+
|
|
|
+def show_or_exit(key):
|
|
|
+ if isinstance(key, tuple):
|
|
|
+ return
|
|
|
+
|
|
|
+ if key in ('esc',):
|
|
|
+ raise urwid.ExitMainLoop()
|
|
|
+
|
|
|
+def _apply_choice_callback(activity_manager, name, widget, value):
|
|
|
+ txn = activity_manager.get('price_check')
|
|
|
+ txn.apply_choice(name)(widget, value)
|
|
|
+ activity_manager.show(txn.update())
|
|
|
+
|
|
|
+def _show_suggestions_callback(activity_manager, name, options):
|
|
|
+ txn = activity_manager.get('price_check')
|
|
|
+ activity_manager.show(
|
|
|
+ activity_manager.create(SuggestionPopup, None,
|
|
|
+ txn.original_widget, name, options,
|
|
|
+ lambda w,x: _apply_choice_callback(activity_manager, name, w, x),
|
|
|
+ lambda: activity_manager.show(txn.update())
|
|
|
+ ))
|
|
|
+
|
|
|
+def _autocomplete_callback(activity_manager, query_manager, name, data):
|
|
|
+ options = query_manager.unique_suggestions(name, **data)
|
|
|
+ if len(options) > 0:
|
|
|
+ _show_suggestions_callback(activity_manager, name, options)
|
|
|
+
|
|
|
+class GroceryPriceCheck(urwid.WidgetPlaceholder):
|
|
|
+ def __init__(self, activity_manager):
|
|
|
+ super().__init__(urwid.SolidFill(u'/'))
|
|
|
+ self.activity_manager = activity_manager
|
|
|
+ price_check = self.activity_manager.get('price_check')
|
|
|
+
|
|
|
+ self.activity_manager.show(self)
|
|
|
+ self.activity_manager.show(price_check)
|
|
|
+
|
|
|
+class PriceCheck(urwid.WidgetPlaceholder):
|
|
|
+
|
|
|
+ def iter_focus_paths(self):
|
|
|
+ initial = [2,0,0,0]
|
|
|
+ container = self.original_widget.original_widget.original_widget
|
|
|
+ _set_focus_path(container, initial)
|
|
|
+ while True:
|
|
|
+ path = container.get_focus_path()
|
|
|
+ yield path
|
|
|
+ self.advance_focus()
|
|
|
+ path = container.get_focus_path()
|
|
|
+ if path == initial:
|
|
|
+ self.advance_focus()
|
|
|
+ break
|
|
|
+
|
|
|
+ def advance_focus(self, reverse=False):
|
|
|
+ container = self.original_widget.original_widget.original_widget
|
|
|
+
|
|
|
+ path = container.get_focus_path()
|
|
|
+ if reverse:
|
|
|
+ paths = [ i for i in self.iter_focus_paths() ]
|
|
|
+ zipped_paths = zip(paths, [
|
|
|
+ *paths[1:], paths[0]
|
|
|
+ ])
|
|
|
+ prev_path = map(lambda x: x[0], filter(
|
|
|
+ lambda x: x[1] == path,
|
|
|
+ zipped_paths
|
|
|
+ ))
|
|
|
+ _set_focus_path(container, next(prev_path))
|
|
|
+ return
|
|
|
+
|
|
|
+ _iter = [ i for i in enumerate(path) ][::-1]
|
|
|
+
|
|
|
+ for idx, part in _iter:
|
|
|
+ p = [ i for i in path ]
|
|
|
+ if reverse:
|
|
|
+ p[idx] -= 1
|
|
|
+ else:
|
|
|
+ p[idx] += 1
|
|
|
+
|
|
|
+ try:
|
|
|
+ _set_focus_path(container, p)
|
|
|
+ if path == [3]:
|
|
|
+ self.advance_focus(reverse=reverse)
|
|
|
+ return
|
|
|
+ except IndexError:
|
|
|
+ path[idx] = 0
|
|
|
+
|
|
|
+ container.set_focus_path([2,0,0,0])
|
|
|
+
|
|
|
+ def keypress(self, size, key):
|
|
|
+ if isinstance(key, tuple):
|
|
|
+ return
|
|
|
+
|
|
|
+ if getattr(self.original_widget.original_widget, 'original_widget', None) is None:
|
|
|
+ return super().keypress(size, key)
|
|
|
+
|
|
|
+ if key == 'tab':
|
|
|
+ self.advance_focus()
|
|
|
+ elif key == 'shift tab':
|
|
|
+ self.advance_focus(reverse=True)
|
|
|
+ else:
|
|
|
+ return super().keypress(size, key)
|
|
|
+
|
|
|
+ def _init_data(self, fields):
|
|
|
+ self.data = OrderedDict([
|
|
|
+ (k, '') for k in fields
|
|
|
+ ])
|
|
|
+ self.clear()
|
|
|
+
|
|
|
+ def _apply_choice(self, name, value):
|
|
|
+ self._apply_changes(name, value)
|
|
|
+ for k,v in self.data.items():
|
|
|
+ if k == name or v:
|
|
|
+ continue
|
|
|
+ options = self.query_manager.unique_suggestions(k, **self.data)
|
|
|
+ if len(options) == 1 and k != 'ts':
|
|
|
+ self._apply_changes(k, list(options)[0])
|
|
|
+
|
|
|
+ def apply_choice(self, name):
|
|
|
+ return lambda w,x: self._apply_choice(name, x)
|
|
|
+
|
|
|
+ def _apply_changes(self, name, value):
|
|
|
+ self.data.update({
|
|
|
+ name: value,
|
|
|
+ })
|
|
|
+
|
|
|
+ def apply_changes(self, name):
|
|
|
+ return lambda w,x: self._apply_changes(name, x)
|
|
|
+
|
|
|
+ def apply_organic_state(self, w, state):
|
|
|
+ self.data['organic'] = repr(state).lower()
|
|
|
+
|
|
|
+ def clear(self):
|
|
|
+ for k in self.data:
|
|
|
+ if k in ('ts', 'store',):
|
|
|
+ continue
|
|
|
+ self.data[k] = ''
|
|
|
+ self.organic_checkbox.set_state(False)
|
|
|
+
|
|
|
+ def update(self):
|
|
|
+ for k in self.edit_fields:
|
|
|
+ self.edit_fields[k].set_edit_text(self.data[k])
|
|
|
+ date, store = self.data['ts'], self.data['store']
|
|
|
+ self.text_fields['dbview'].set_text(
|
|
|
+ self.query_manager.get_session_transactions(date, store) if None not in (
|
|
|
+ date or None, store or None
|
|
|
+ ) else ''
|
|
|
+ )
|
|
|
+ self.organic_checkbox.set_state(True if self.data['organic'] == 'true' else False)
|
|
|
+ return self
|
|
|
+
|
|
|
+ def __init__(self, query_manager, fields,
|
|
|
+ top_pane, left_pane, right_pane, bottom_pane,
|
|
|
+ autocomplete_cb):
|
|
|
+ super().__init__(urwid.SolidFill(u'/'))
|
|
|
+ self.organic_checkbox = NoTabCheckBox(
|
|
|
+ u"Organic",
|
|
|
+ on_state_change=self.apply_organic_state
|
|
|
+ )
|
|
|
+ self.query_manager = query_manager
|
|
|
+ self._init_data(fields)
|
|
|
+ self.top_pane = top_pane
|
|
|
+ self.left_pane = left_pane
|
|
|
+ self.right_pane = right_pane
|
|
|
+ self.bottom_pane = bottom_pane
|
|
|
+ self.edit_fields = OrderedDict()
|
|
|
+ self.text_fields = OrderedDict()
|
|
|
+ for k in self.data:
|
|
|
+ if k in self.right_pane and k != 'unit':
|
|
|
+ ef = AutoCompleteFloatEdit(('bg', k), apply_change_func=lambda name: autocomplete_cb(name, self.data))
|
|
|
+ elif k != 'organic':
|
|
|
+ ef = AutoCompleteEdit(('bg', k), apply_change_func=lambda name: autocomplete_cb(name, self.data))
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ ef.set_edit_text(self.data[k])
|
|
|
+ urwid.connect_signal(ef, 'change', self.apply_changes(k))
|
|
|
+ self.edit_fields[k] = ef
|
|
|
+
|
|
|
+ header = urwid.Text(u'Price Check', 'center')
|
|
|
+ _copyright = urwid.Text(COPYRIGHT, 'center')
|
|
|
+ self.buttons = OrderedDict()
|
|
|
+ group = []
|
|
|
+ for k in top_pane:
|
|
|
+ if k != 'sort':
|
|
|
+ self.buttons.update({ k: urwid.Button(('streak', f'{k}')) })
|
|
|
+ continue
|
|
|
+ self.buttons.update({ f'{k}_price': urwid.RadioButton(group, ('streak', u'Best Price'), state="first True",
|
|
|
+ on_state_change=None, user_data=[ self.buttons[b] for b in self.buttons if 'sort' in b ] )})
|
|
|
+ self.buttons.update({ f'{k}_date': urwid.RadioButton(group, ('streak', u'Last Price'), state="first True",
|
|
|
+ on_state_change=None, user_data=[ self.buttons[b] for b in self.buttons if 'sort' in b ] )})
|
|
|
+
|
|
|
+ urwid.connect_signal(self.buttons['clear'], 'click', lambda: None)
|
|
|
+ urwid.connect_signal(self.buttons['exit'], 'click', lambda: None)
|
|
|
+
|
|
|
+ banner = urwid.Pile([
|
|
|
+ urwid.Padding(header, 'center', width=('relative', 100)),
|
|
|
+ urwid.Padding(_copyright, 'center', width=('relative', 100)),
|
|
|
+ ])
|
|
|
+ banner = urwid.AttrMap(banner, 'banner')
|
|
|
+ fields = dict([
|
|
|
+ (k, urwid.LineBox(urwid.AttrMap(self.edit_fields[k], 'streak'), title=k.title(), title_align='left')) for k in self.edit_fields
|
|
|
+ ])
|
|
|
+ fields['organic'] = urwid.LineBox(urwid.AttrMap(self.organic_checkbox, 'bg'))
|
|
|
+ dbview = urwid.Text('')
|
|
|
+ self.text_fields.update({'dbview': dbview})
|
|
|
+ fields.update({
|
|
|
+ 'dbview': urwid.LineBox(
|
|
|
+ urwid.AttrMap(dbview, 'streak'),
|
|
|
+ title="Historic Prices",
|
|
|
+ title_align='center',
|
|
|
+ )
|
|
|
+ })
|
|
|
+
|
|
|
+ right_pane_widget = (12, urwid.Pile(map(
|
|
|
+ lambda x: fields[x] if x is not None else urwid.Divider(),
|
|
|
+ self.right_pane
|
|
|
+ )))
|
|
|
+ left_pane_widget = urwid.Pile(map(
|
|
|
+ lambda x: fields[x] if x is not None else urwid.Divider(),
|
|
|
+ self.left_pane
|
|
|
+ ))
|
|
|
+
|
|
|
+ widget = urwid.Pile([
|
|
|
+ banner,
|
|
|
+ urwid.Divider(),
|
|
|
+ urwid.Columns([v for v in self.buttons.values()],
|
|
|
+ dividechars=2,
|
|
|
+ ),
|
|
|
+ urwid.Columns((left_pane_widget, right_pane_widget),
|
|
|
+ dividechars=2,
|
|
|
+ ),
|
|
|
+ *[ fields[c] if c is not None else urwid.Divider() for c in self.bottom_pane ],
|
|
|
+ ])
|
|
|
+ widget = urwid.Filler(widget, 'top')
|
|
|
+ widget = urwid.AttrMap(widget, 'bg')
|
|
|
+ self.original_widget = widget
|
|
|
+
|
|
|
+cur.execute("BEGIN")
|
|
|
+
|
|
|
+activity_manager = ActivityManager()
|
|
|
+query_manager = QueryManager(activity_manager, cur, display)
|
|
|
+
|
|
|
+activity_manager.create(PriceCheck, 'price_check',
|
|
|
+ query_manager, inputs, top_pane, left_pane, right_pane, bottom_pane,
|
|
|
+ lambda name, data: _autocomplete_callback(activity_manager, query_manager, name, data))
|
|
|
+
|
|
|
+app = GroceryPriceCheck(activity_manager)
|
|
|
+
|
|
|
+loop = urwid.MainLoop(app, palette, unhandled_input=show_or_exit)
|
|
|
+loop.run()
|
|
|
+
|
|
|
+cur.close()
|
|
|
+conn.close()
|