|
@@ -6,22 +6,20 @@
|
|
|
#
|
|
|
# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
|
|
|
import pandas as pd
|
|
|
+import itertools
|
|
|
+import sys
|
|
|
+import urwid
|
|
|
+import time
|
|
|
from txn_view import get_statement as get_session_transactions_statement
|
|
|
from dateutil.parser import parse as parse_time
|
|
|
from widgets import (
|
|
|
NoTabCheckBox,
|
|
|
AutoCompleteEdit,
|
|
|
AutoCompleteFloatEdit,
|
|
|
+ GroceryTransactionEditor,
|
|
|
_set_focus_path,
|
|
|
)
|
|
|
-import sys
|
|
|
-import urwid
|
|
|
from decimal import Decimal
|
|
|
-import time
|
|
|
-import itertools
|
|
|
-from collections import (
|
|
|
- OrderedDict,
|
|
|
-)
|
|
|
|
|
|
try:
|
|
|
from db_credentials import HOST, PASSWORD
|
|
@@ -42,8 +40,6 @@ try:
|
|
|
except:
|
|
|
from mock import *
|
|
|
|
|
|
-COPYRIGHT = "Copyright (c) Daniel Sheffield 2021"
|
|
|
-
|
|
|
palette = [
|
|
|
('banner', 'light gray', 'dark red'),
|
|
|
('streak', 'light red', 'dark gray'),
|
|
@@ -162,302 +158,15 @@ def interleave(_list, div):
|
|
|
yield element
|
|
|
yield div
|
|
|
|
|
|
-class GroceryTransactionEditor(urwid.WidgetPlaceholder):
|
|
|
- def __init__(self, log, fields):
|
|
|
- super(GroceryTransactionEditor, self).__init__(urwid.SolidFill(u'/'))
|
|
|
- self.organic_checkbox = NoTabCheckBox(
|
|
|
- u"Organic",
|
|
|
- on_state_change=self.apply_organic_state
|
|
|
- )
|
|
|
- self._init_data(fields)
|
|
|
- self.widgets = dict()
|
|
|
- self.show('transaction')
|
|
|
- with open(log, 'r') as f:
|
|
|
- for line in f.readlines():
|
|
|
- #print(cur.mogrify(line))
|
|
|
- #input()
|
|
|
- cur.execute(line)
|
|
|
-
|
|
|
- self.log = self.open(log)
|
|
|
-
|
|
|
- def iter_focus_paths(self):
|
|
|
- initial = [2,0,0,0]
|
|
|
- container = self.original_widget.original_widget.original_widget
|
|
|
- _set_focus_path(container, initial)
|
|
|
- while True:
|
|
|
- path = container.get_focus_path()
|
|
|
- yield path
|
|
|
- self.advance_focus()
|
|
|
- path = container.get_focus_path()
|
|
|
- if path == initial:
|
|
|
- self.advance_focus()
|
|
|
- break
|
|
|
-
|
|
|
- def advance_focus(self, reverse=False):
|
|
|
- container = self.original_widget.original_widget.original_widget
|
|
|
-
|
|
|
- path = container.get_focus_path()
|
|
|
- if reverse:
|
|
|
- paths = [ i for i in self.iter_focus_paths() ]
|
|
|
- zipped_paths = zip(paths, [
|
|
|
- *paths[1:], paths[0]
|
|
|
- ])
|
|
|
- prev_path = map(lambda x: x[0], filter(
|
|
|
- lambda x: x[1] == path,
|
|
|
- zipped_paths
|
|
|
- ))
|
|
|
- _set_focus_path(container, next(prev_path))
|
|
|
- return
|
|
|
-
|
|
|
- _iter = [ i for i in enumerate(path) ][::-1]
|
|
|
-
|
|
|
- for idx, part in _iter:
|
|
|
- p = [ i for i in path ]
|
|
|
- if reverse:
|
|
|
- p[idx] -= 1
|
|
|
- else:
|
|
|
- p[idx] += 1
|
|
|
-
|
|
|
- try:
|
|
|
- _set_focus_path(container, p)
|
|
|
- if path == [3]:
|
|
|
- self.advance_focus(reverse=reverse)
|
|
|
- return
|
|
|
- except IndexError:
|
|
|
- path[idx] = 0
|
|
|
-
|
|
|
- container.set_focus_path([2,0,0,0])
|
|
|
-
|
|
|
-
|
|
|
- def keypress(self, size, key):
|
|
|
- if isinstance(key, tuple):
|
|
|
- return
|
|
|
-
|
|
|
- if getattr(self.original_widget.original_widget, 'original_widget', None) is None:
|
|
|
- return super(GroceryTransactionEditor, self).keypress(size, key)
|
|
|
-
|
|
|
- if key == 'tab':
|
|
|
- self.advance_focus()
|
|
|
- elif key == 'shift tab':
|
|
|
- self.advance_focus(reverse=True)
|
|
|
- else:
|
|
|
- return super(GroceryTransactionEditor, self).keypress(size, key)
|
|
|
-
|
|
|
-
|
|
|
- def _init_data(self, fields):
|
|
|
- self.data = OrderedDict([
|
|
|
- (k, '') for k in fields
|
|
|
- ])
|
|
|
- self.clear()
|
|
|
-
|
|
|
- def get_activity(self, name):
|
|
|
- return getattr(self, name)
|
|
|
-
|
|
|
- def show(self, name, *args, **kwargs):
|
|
|
- if name in self.widgets:
|
|
|
- self.original_widget = self.widgets[name]
|
|
|
- self.update(name)
|
|
|
- return
|
|
|
-
|
|
|
- a = self.get_activity(name)
|
|
|
- widget = a(*args, **kwargs)
|
|
|
- if name != 'suggestions':
|
|
|
- self.widgets[name] = widget
|
|
|
-
|
|
|
- self.original_widget = widget
|
|
|
-
|
|
|
- def _apply_choice(self, name, value):
|
|
|
- self.data.update({
|
|
|
- name: value,
|
|
|
- })
|
|
|
- for k,v in self.data.items():
|
|
|
- if k == name or v:
|
|
|
- continue
|
|
|
- options = unique_suggestions(k, **self.data)
|
|
|
- if len(options) == 1 and k != 'ts':
|
|
|
- self.data.update({
|
|
|
- k: list(options)[0],
|
|
|
- })
|
|
|
- self.show('transaction')
|
|
|
-
|
|
|
- def apply_choice(self, name):
|
|
|
- apply = lambda w,x: self._apply_choice(name, x)
|
|
|
- return apply
|
|
|
-
|
|
|
- def _apply_changes(self, name, value):
|
|
|
- self.data.update({
|
|
|
- name: value,
|
|
|
- })
|
|
|
-
|
|
|
- def _autocomplete(self, name):
|
|
|
- options = unique_suggestions(name, **self.data)
|
|
|
- if 0 < len(options):
|
|
|
- self.show('suggestions', name, options=options)
|
|
|
-
|
|
|
- def apply_organic_state(self, w, state):
|
|
|
- self.data['organic'] = repr(state).lower()
|
|
|
-
|
|
|
- def apply_changes(self, name):
|
|
|
- apply = lambda w,x: self._apply_changes(name, x)
|
|
|
- return apply
|
|
|
-
|
|
|
- def suggestions(self, name, options=None):
|
|
|
- body = [urwid.Text(name.title()), urwid.Divider()]
|
|
|
- for c in options:
|
|
|
- button = urwid.Button(c)
|
|
|
- urwid.connect_signal(button, 'click', self.apply_choice(name), c)
|
|
|
- body.append(urwid.AttrMap(button, None, focus_map='reversed'))
|
|
|
- walker = urwid.SimpleFocusListWalker(body, wrap_around=False)
|
|
|
- listbox = urwid.ListBox(walker)
|
|
|
- pad = urwid.Padding(listbox, left=2, right=2)
|
|
|
- top = urwid.Overlay(pad, self.original_widget,
|
|
|
- align='center', width=('relative', 60),
|
|
|
- valign='middle', height=('relative', 60),
|
|
|
- min_width=20, min_height=9)
|
|
|
- def keypress(size, key, original=top.keypress):
|
|
|
- if key == 'esc':
|
|
|
- self.show('transaction')
|
|
|
- return
|
|
|
-
|
|
|
- if key == 'tab':
|
|
|
- return
|
|
|
-
|
|
|
- return original(size, key)
|
|
|
-
|
|
|
- top.keypress = keypress
|
|
|
- return urwid.AttrMap(top, 'banner')
|
|
|
-
|
|
|
- def _open(self, log):
|
|
|
- with open(log, 'a') as f:
|
|
|
- yield f
|
|
|
-
|
|
|
- def open(self, log):
|
|
|
- self._to_close = self._open(log)
|
|
|
- return next(self._to_close)
|
|
|
-
|
|
|
- def close(self):
|
|
|
- if self._to_close is not None:
|
|
|
- self._to_close.close()
|
|
|
- self._to_close = None
|
|
|
-
|
|
|
- def save(self):
|
|
|
- ts = self.data['ts']
|
|
|
- store = self.data['store']
|
|
|
- description = self.data['description']
|
|
|
- quantity = self.data['quantity']
|
|
|
- unit = self.data['unit']
|
|
|
- price = self.data['price']
|
|
|
- product = self.data['product']
|
|
|
- organic = self.data['organic'] if self.data['organic'] else 'false'
|
|
|
- statement = \
|
|
|
- f"CALL insert_transaction('{ts}', $store${store}$store$, " \
|
|
|
- f"$descr${description}$descr$, {quantity}, $unit${unit}$unit$, " \
|
|
|
- f"{price}, $produ${product}$produ$, {organic});\n"
|
|
|
- self.log.write(statement)
|
|
|
- cur.execute(statement)
|
|
|
-
|
|
|
- def clear(self):
|
|
|
- for k in self.data:
|
|
|
- if k in ('ts', 'store',):
|
|
|
- continue
|
|
|
- self.data[k] = ''
|
|
|
- self.organic_checkbox.set_state(False)
|
|
|
-
|
|
|
- def save_and_clear(self):
|
|
|
- self.save()
|
|
|
- self.clear()
|
|
|
- self.show('transaction')
|
|
|
-
|
|
|
- def update(self, name):
|
|
|
- getattr(self, f"update_{name}")()
|
|
|
-
|
|
|
- def update_transaction(self):
|
|
|
- for k in self.edit_fields:
|
|
|
- self.edit_fields[k].set_edit_text(self.data[k])
|
|
|
- date, store = self.data['ts'], self.data['store']
|
|
|
- self.text_fields['dbview'].set_text(
|
|
|
- get_session_transactions(date, store) if None not in (
|
|
|
- date or None, store or None
|
|
|
- ) else ''
|
|
|
- )
|
|
|
- self.organic_checkbox.set_state(True if self.data['organic'] == 'true' else False)
|
|
|
-
|
|
|
- def transaction(self):
|
|
|
- self.edit_fields = OrderedDict()
|
|
|
- self.text_fields = OrderedDict()
|
|
|
- for k in self.data:
|
|
|
- if k in side_pane and k != 'unit':
|
|
|
- ef = AutoCompleteFloatEdit(('bg', k), apply_change_func=self._autocomplete)
|
|
|
- elif k != 'organic':
|
|
|
- ef = AutoCompleteEdit(('bg', k), apply_change_func=self._autocomplete)
|
|
|
- else:
|
|
|
- continue
|
|
|
- ef.set_edit_text(self.data[k])
|
|
|
- urwid.connect_signal(ef, 'change', self.apply_changes(k))
|
|
|
- self.edit_fields[k] = ef
|
|
|
-
|
|
|
- header = urwid.Text(u'Fill Transaction', 'center')
|
|
|
- _copyright = urwid.Text(COPYRIGHT, 'center')
|
|
|
- done_button = urwid.Button(('streak', u'Done'))
|
|
|
- urwid.connect_signal(done_button, 'click', lambda w: self.save_and_clear())
|
|
|
- banner = urwid.Pile([
|
|
|
- urwid.Padding(header, 'center', width=('relative', 100)),
|
|
|
- urwid.Padding(_copyright, 'center', width=('relative', 100)),
|
|
|
- ])
|
|
|
- banner = urwid.AttrMap(banner, 'banner')
|
|
|
- fields = dict([
|
|
|
- (k, urwid.LineBox(urwid.AttrMap(self.edit_fields[k], 'streak'), title=k.title(), title_align='left')) for k in self.edit_fields
|
|
|
- ])
|
|
|
- txn_view = urwid.Text('')
|
|
|
- self.text_fields.update({'dbview': txn_view})
|
|
|
- fields.update({
|
|
|
- 'dbview': urwid.LineBox(
|
|
|
- urwid.AttrMap(txn_view, 'streak'),
|
|
|
- title="Session Data",
|
|
|
- title_align='left',
|
|
|
- )
|
|
|
- })
|
|
|
-
|
|
|
- side_pane_widget = (12, urwid.Pile([
|
|
|
- fields[r] if r is not None else urwid.Divider() for r in side_pane
|
|
|
- ]))
|
|
|
- main_pane_widgets = []
|
|
|
- for i, r in enumerate(grid_layout):
|
|
|
- widgets = []
|
|
|
- for c in r:
|
|
|
- if c is not None:
|
|
|
- if c != 'organic':
|
|
|
- widgets.append(fields[c])
|
|
|
- else:
|
|
|
- widgets.append(urwid.LineBox(urwid.AttrMap(self.organic_checkbox, 'bg')))
|
|
|
- else:
|
|
|
- widgets.append(urwid.Divider())
|
|
|
- main_pane_widgets.append(urwid.Columns(widgets))
|
|
|
-
|
|
|
-
|
|
|
- main_pane_widget = urwid.Pile(main_pane_widgets)
|
|
|
-
|
|
|
- widget = urwid.Pile([
|
|
|
- banner,
|
|
|
- urwid.Divider(),
|
|
|
- urwid.Columns((main_pane_widget, side_pane_widget),
|
|
|
- dividechars=2,
|
|
|
- ),
|
|
|
- *[ fields[c] if c is not None else urwid.Divider() for c in bottom_pane ],
|
|
|
- urwid.Divider(),
|
|
|
- done_button,
|
|
|
- ])
|
|
|
- widget = urwid.Filler(widget, 'top')
|
|
|
- widget = urwid.AttrMap(widget, 'bg')
|
|
|
- return widget
|
|
|
-
|
|
|
#screen = urwid.raw_display.Screen()
|
|
|
#screen.set_terminal_properties(colors=256, has_underline=True)
|
|
|
#screen.register_palette(palette)
|
|
|
|
|
|
args = sys.argv
|
|
|
log = args[1]
|
|
|
-app = GroceryTransactionEditor(log, cols)
|
|
|
+app = GroceryTransactionEditor(cur, log,
|
|
|
+ cols, grid_layout,
|
|
|
+ side_pane, bottom_pane)
|
|
|
loop = urwid.MainLoop(app, palette, unhandled_input=show_or_exit)
|
|
|
loop.run()
|
|
|
|