|
@@ -0,0 +1,241 @@
|
|
|
+#!/usr/bin/python3
|
|
|
+#
|
|
|
+# Copyright (c) Daniel Sheffield 2021
|
|
|
+#
|
|
|
+# All rights reserved
|
|
|
+#
|
|
|
+# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
|
|
|
+
|
|
|
+import psycopg2
|
|
|
+import urwid
|
|
|
+import time
|
|
|
+from collections import (
|
|
|
+ OrderedDict,
|
|
|
+)
|
|
|
+
|
|
|
+COPYRIGHT = "Copyright (c) Daniel Sheffield 2021"
|
|
|
+
|
|
|
+conn = psycopg2.connect("dbname=das user=das")
|
|
|
+cur = conn.cursor()
|
|
|
+
|
|
|
+cols = [
|
|
|
+ 'ts',
|
|
|
+ 'store',
|
|
|
+ 'product',
|
|
|
+ 'description',
|
|
|
+ 'quantity',
|
|
|
+ 'unit',
|
|
|
+ 'price',
|
|
|
+ 'organic',
|
|
|
+ 'category',
|
|
|
+ 'group',
|
|
|
+]
|
|
|
+NON_IDENTIFIER_COLUMNS = [
|
|
|
+ 'ts',
|
|
|
+ 'store',
|
|
|
+ 'quantity',
|
|
|
+ 'unit',
|
|
|
+ 'price',
|
|
|
+ 'organic',
|
|
|
+]
|
|
|
+
|
|
|
+display_map = {
|
|
|
+ 'ts': lambda x: f"{time.strftime('%Y-%m-%d %H:%M', (x.year, x.month, x.day, x.hour, x.minute, 0, 0, 0, 0))}",
|
|
|
+ 'price': lambda x: f"{x:.2f}",
|
|
|
+ 'quantity': lambda x: f"{x:.2f}",
|
|
|
+ 'organic': lambda x: "true" if x else "false",
|
|
|
+}
|
|
|
+display = lambda data, name: display_map[name](data) if name in display_map else data
|
|
|
+
|
|
|
+cur.execute("SELECT * FROM transaction_view;")
|
|
|
+col_idx_map = dict([ (d.name, i) for i,d in enumerate(cur.description) if d.name in cols ])
|
|
|
+def records(cursor, col_idx_map):
|
|
|
+ for row in cursor.fetchall():
|
|
|
+ #print(row)
|
|
|
+ #raise(Exception(repr(row)))
|
|
|
+ yield dict([
|
|
|
+ (name, display(row[i], name)) for name, i in col_idx_map.items()
|
|
|
+ ])
|
|
|
+ cur.execute("SELECT * FROM transaction_view;")
|
|
|
+
|
|
|
+
|
|
|
+def record_matches(record, **kwargs):
|
|
|
+ for k,v in kwargs.items():
|
|
|
+ if not v:
|
|
|
+ continue
|
|
|
+ if v.lower() not in record[k].lower():
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+def unique_suggestions(name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
|
|
|
+ [ kwargs.pop(k) for k in exclude if k in kwargs]
|
|
|
+ return sorted(set(map(lambda x: x[name], suggestions(name, exclude=exclude, **kwargs))))
|
|
|
+
|
|
|
+def suggestions(name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
|
|
|
+ [ kwargs.pop(k) for k in exclude if k in kwargs]
|
|
|
+ yield from filter(lambda x: record_matches(x, **kwargs), records(cur, col_idx_map))
|
|
|
+
|
|
|
+def show_or_exit(key):
|
|
|
+ if isinstance(key, tuple):
|
|
|
+ return
|
|
|
+ if key in ('esc'):
|
|
|
+ raise urwid.ExitMainLoop()
|
|
|
+
|
|
|
+class AutoCompleteEdit(urwid.Edit):
|
|
|
+ def __init__(self, name, *args, apply_change_func=None, **kwargs):
|
|
|
+ title = name.title()
|
|
|
+ super(AutoCompleteEdit, self).__init__(f"{title:12s}: ", *args, **kwargs)
|
|
|
+ self.name = name
|
|
|
+ self.apply = apply_change_func
|
|
|
+
|
|
|
+ def keypress(self, size, key):
|
|
|
+ if key != 'tab':
|
|
|
+ return super(AutoCompleteEdit, self).keypress(size, key)
|
|
|
+ self.apply(self.name)
|
|
|
+
|
|
|
+class GroceryTransactionEditor(urwid.WidgetPlaceholder):
|
|
|
+ def __init__(self, fields):
|
|
|
+ super(GroceryTransactionEditor, self).__init__(urwid.SolidFill(u'/'))
|
|
|
+ self._init_data(fields)
|
|
|
+ self.widgets = dict()
|
|
|
+ self.show('transaction')
|
|
|
+
|
|
|
+ def _init_data(self, fields):
|
|
|
+ self.data = OrderedDict([
|
|
|
+ (k, '') for k in fields
|
|
|
+ ])
|
|
|
+ self.clear()
|
|
|
+
|
|
|
+ def get_activity(self, name):
|
|
|
+ return getattr(self, name)
|
|
|
+
|
|
|
+ def show(self, name, *args, **kwargs):
|
|
|
+ if name in self.widgets:
|
|
|
+ self.original_widget = self.widgets[name]
|
|
|
+ self.update(name)
|
|
|
+ return
|
|
|
+
|
|
|
+ a = self.get_activity(name)
|
|
|
+ widget = a(*args, **kwargs)
|
|
|
+ if name != 'suggestions':
|
|
|
+ self.widgets[name] = widget
|
|
|
+
|
|
|
+ self.original_widget = widget
|
|
|
+
|
|
|
+ def _apply_choice(self, name, value):
|
|
|
+ self.data.update({
|
|
|
+ name: value,
|
|
|
+ })
|
|
|
+ for k,v in self.data.items():
|
|
|
+ if k == name or v:
|
|
|
+ continue
|
|
|
+ options = unique_suggestions(k, **self.data)
|
|
|
+ if len(options) == 1:
|
|
|
+ self.data.update({
|
|
|
+ k: list(options)[0],
|
|
|
+ })
|
|
|
+ self.show('transaction')
|
|
|
+
|
|
|
+ def apply_choice(self, name):
|
|
|
+ apply = lambda w,x: self._apply_choice(name, x)
|
|
|
+ return apply
|
|
|
+
|
|
|
+ def _apply_changes(self, name, value):
|
|
|
+ self.data.update({
|
|
|
+ name: value,
|
|
|
+ })
|
|
|
+
|
|
|
+ def _autocomplete(self, name):
|
|
|
+ options = unique_suggestions(name, **self.data)
|
|
|
+ if 0 < len(options):
|
|
|
+ self.show('suggestions', name, options=options)
|
|
|
+
|
|
|
+ def apply_changes(self, name):
|
|
|
+ apply = lambda w,x: self._apply_changes(name, x)
|
|
|
+ return apply
|
|
|
+
|
|
|
+ def suggestions(self, name, options=None):
|
|
|
+ body = [urwid.Text(name.title()), urwid.Divider()]
|
|
|
+ for c in options:
|
|
|
+ button = urwid.Button(c)
|
|
|
+ urwid.connect_signal(button, 'click', self.apply_choice(name), c)
|
|
|
+ body.append(urwid.AttrMap(button, None, focus_map='reversed'))
|
|
|
+ walker = urwid.SimpleFocusListWalker(body, wrap_around=False)
|
|
|
+ listbox = urwid.ListBox(walker)
|
|
|
+ pad = urwid.Padding(listbox, left=2, right=2)
|
|
|
+ top = urwid.Overlay(pad, self.original_widget,
|
|
|
+ align='center', width=('relative', 60),
|
|
|
+ valign='middle', height=('relative', 60),
|
|
|
+ min_width=20, min_height=9)
|
|
|
+ def keypress(size, key, original=top.keypress):
|
|
|
+ if key != 'esc':
|
|
|
+ return original(size, key)
|
|
|
+ self.show('transaction')
|
|
|
+ top.keypress = keypress
|
|
|
+ return top
|
|
|
+
|
|
|
+ def save(self):
|
|
|
+ fmt = '%Y-%m-%dT%H%M'
|
|
|
+ ts = self.data['ts']
|
|
|
+ ts_log = time.strptime(ts, '%Y-%m-%d %H:%M')
|
|
|
+ store = self.data['store']
|
|
|
+ description = self.data['description']
|
|
|
+ quantity = self.data['quantity']
|
|
|
+ unit = self.data['unit']
|
|
|
+ price = self.data['price']
|
|
|
+ product = self.data['product']
|
|
|
+ organic = self.data['organic']
|
|
|
+ log = f"{time.strftime(fmt, ts_log)}-{store}.txn"
|
|
|
+ with open(log, 'a') as f:
|
|
|
+ f.write(
|
|
|
+ f"CALL insert_transaction('{ts}', $store${store}$store$, "
|
|
|
+ f"$descr${description}$descr$, {quantity}, $unit${unit}$unit$, "
|
|
|
+ f"{price}, $produ${product}$produ$, {organic});\n"
|
|
|
+ )
|
|
|
+
|
|
|
+ def clear(self):
|
|
|
+ for k in self.data:
|
|
|
+ if k in ('ts', 'store',):
|
|
|
+ continue
|
|
|
+ self.data[k] = ''
|
|
|
+ #self.data['organic'] = 'false'
|
|
|
+
|
|
|
+ def save_and_clear(self):
|
|
|
+ self.save()
|
|
|
+ self.clear()
|
|
|
+ self.show('transaction')
|
|
|
+
|
|
|
+ def update(self, name):
|
|
|
+ getattr(self, f"update_{name}")()
|
|
|
+
|
|
|
+ def update_transaction(self):
|
|
|
+ for k in self.data:
|
|
|
+ self.edit_fields[k].set_edit_text(self.data[k])
|
|
|
+
|
|
|
+ def transaction(self):
|
|
|
+ self.edit_fields = OrderedDict()
|
|
|
+ for k in self.data:
|
|
|
+ self.edit_fields[k] = AutoCompleteEdit(k, apply_change_func=self._autocomplete)
|
|
|
+ self.edit_fields[k].set_edit_text(self.data[k])
|
|
|
+ urwid.connect_signal(self.edit_fields[k], 'change', self.apply_changes(k))
|
|
|
+
|
|
|
+ header = urwid.Text('Fill transaction', 'center')
|
|
|
+ _copyright = urwid.Text(COPYRIGHT, 'center')
|
|
|
+ button = urwid.Button(u'Done')
|
|
|
+ urwid.connect_signal(button, 'click', lambda w: self.save_and_clear())
|
|
|
+ widget = urwid.Pile([
|
|
|
+ urwid.Padding(header, 'center', width=('relative', 30)),
|
|
|
+ urwid.Padding(_copyright, 'center', width=('relative', 30)),
|
|
|
+ *[ self.edit_fields[k] for k in self.data ],
|
|
|
+ urwid.Divider(),
|
|
|
+ button,
|
|
|
+ ])
|
|
|
+ widget = urwid.Filler(widget, 'top')
|
|
|
+ return widget
|
|
|
+
|
|
|
+app = GroceryTransactionEditor(cols)
|
|
|
+loop = urwid.MainLoop(app, unhandled_input=show_or_exit)
|
|
|
+loop.run()
|
|
|
+
|
|
|
+cur.close()
|
|
|
+conn.close()
|