|
@@ -1,472 +0,0 @@
|
|
|
-#
|
|
|
-# Copyright (c) Daniel Sheffield 2021
|
|
|
-#
|
|
|
-# All rights reserved
|
|
|
-#
|
|
|
-# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
|
|
|
-import urwid
|
|
|
-from decimal import Decimal
|
|
|
-from urwid import numedit
|
|
|
-from collections import (
|
|
|
- OrderedDict,
|
|
|
-)
|
|
|
-
|
|
|
-COPYRIGHT = "Copyright (c) Daniel Sheffield 2021"
|
|
|
-
|
|
|
-class AutoCompleteEdit(urwid.Edit):
|
|
|
- def __init__(self, name, *args, apply_change_func=None, **kwargs):
|
|
|
- if isinstance(name, tuple):
|
|
|
- pallete, title = name
|
|
|
- self.name = title
|
|
|
- title = title.title()
|
|
|
- passthrough = (pallete, u' ' if title.lower() == 'unit' else u'')
|
|
|
- else:
|
|
|
- self.name = name
|
|
|
- title = name.title()
|
|
|
- passthrough = u' ' if name.lower() == 'unit' else u''
|
|
|
-
|
|
|
- super().__init__(passthrough, *args, **kwargs)
|
|
|
- self.apply = apply_change_func
|
|
|
-
|
|
|
- def keypress(self, size, key):
|
|
|
- if key == 'enter':
|
|
|
- self.apply(self.name)
|
|
|
- return
|
|
|
- elif key == 'delete':
|
|
|
- self.set_edit_text('')
|
|
|
- return
|
|
|
-
|
|
|
- return super().keypress(size, key)
|
|
|
-
|
|
|
-
|
|
|
-class AutoCompleteFloatEdit(numedit.FloatEdit):
|
|
|
- def __init__(self, name, *args, apply_change_func=None, **kwargs):
|
|
|
- self.last_val = None
|
|
|
- self.op = '='
|
|
|
- self.pallete = None
|
|
|
- if isinstance(name, tuple):
|
|
|
- self.pallete, self.name = name
|
|
|
- title = self.name.title()
|
|
|
- passthrough = (self.pallete, f'{self.op} ')
|
|
|
- else:
|
|
|
- self.name = name
|
|
|
- title = name.title()
|
|
|
- passthrough = title
|
|
|
-
|
|
|
- super(AutoCompleteFloatEdit, self).__init__(passthrough, *args, **kwargs)
|
|
|
- self.apply = apply_change_func
|
|
|
-
|
|
|
- def update_caption(self):
|
|
|
- if self.pallete is not None:
|
|
|
- self.set_caption((self.pallete, f'{self.op} '))
|
|
|
- else:
|
|
|
- self.set_caption(f'{self.op} ')
|
|
|
-
|
|
|
- def set_op(self, op):
|
|
|
- self.op = op
|
|
|
- self.last_val = self.value()
|
|
|
- self.set_edit_text('')
|
|
|
- self.update_caption()
|
|
|
-
|
|
|
- def calc(self):
|
|
|
- x = self.last_val
|
|
|
- op = self.op
|
|
|
- if op in ('+', '-',):
|
|
|
- y = self.value() or Decimal(0.0)
|
|
|
- if op == '+':
|
|
|
- z = x + y
|
|
|
- else:
|
|
|
- z = x - y
|
|
|
- elif op in ('*', '/'):
|
|
|
- y = self.value() or Decimal(1.0)
|
|
|
- if op == '*':
|
|
|
- z = x * y
|
|
|
- else:
|
|
|
- z = x / y
|
|
|
- else:
|
|
|
- y = self.value() or Decimal(0.0)
|
|
|
- z = y
|
|
|
-
|
|
|
- self.op = '='
|
|
|
- self.update_caption()
|
|
|
- self.set_edit_text(f'{z:.2f}')
|
|
|
-
|
|
|
- def keypress(self, size, key):
|
|
|
- if isinstance(key, tuple):
|
|
|
- return
|
|
|
- ops = ('+', '-', '*', '/',)
|
|
|
-
|
|
|
- if key in ops:
|
|
|
- if self.op in ops:
|
|
|
- self.calc()
|
|
|
- self.set_op(key)
|
|
|
- return
|
|
|
- elif key == 'enter':
|
|
|
- if self.get_edit_text() == '' or self.value() == Decimal(0.0):
|
|
|
- return self.apply(self.name)
|
|
|
- self.calc()
|
|
|
- return
|
|
|
- elif key == '=':
|
|
|
- self.calc()
|
|
|
- return
|
|
|
- elif key == 'delete':
|
|
|
- self.set_edit_text('')
|
|
|
- self.op = '='
|
|
|
- self.update_caption()
|
|
|
- return
|
|
|
-
|
|
|
- return super().keypress(size, key)
|
|
|
-
|
|
|
-class NoTabCheckBox(urwid.CheckBox):
|
|
|
- def keypress(self, size, key):
|
|
|
- if not isinstance(key, tuple) and key == 'tab':
|
|
|
- return
|
|
|
- else:
|
|
|
- return super().keypress(size, key)
|
|
|
-
|
|
|
-class GroceryTransactionEditor(urwid.WidgetPlaceholder):
|
|
|
- def __init__(self, activity_manager, cur, log):
|
|
|
- super().__init__(urwid.SolidFill(u'/'))
|
|
|
- self.activity_manager = activity_manager
|
|
|
- self.cur = cur
|
|
|
- txn = self.activity_manager.get('transaction')
|
|
|
-
|
|
|
- with open(log, 'r') as f:
|
|
|
- date = None
|
|
|
- store = None
|
|
|
- for line in f.readlines():
|
|
|
- if date is None and store is None:
|
|
|
- if '$store$' in line:
|
|
|
- date, store, _= line.split('$store$')
|
|
|
- date = date.split("'")[1]
|
|
|
- else:
|
|
|
- assert None not in (date, store,), \
|
|
|
- "Both date and store should be set or neither should be set"
|
|
|
- if '$store$' in line:
|
|
|
- assert date in line and f'$store${store}$store$' in line, \
|
|
|
- "Date ({date}) and store ({store}) not found in {line}."\
|
|
|
- " Mixing transactions from different dates and stores is not supported"
|
|
|
- #print(self.cur.mogrify(line))
|
|
|
- #input()
|
|
|
- self.cur.execute(line)
|
|
|
-
|
|
|
- if None not in (date, store):
|
|
|
- txn._apply_choice('ts', date)
|
|
|
- txn._apply_choice('store', store)
|
|
|
-
|
|
|
- self.activity_manager.show(self)
|
|
|
- self.activity_manager.show(txn.update())
|
|
|
-
|
|
|
- self.log = self.open(log)
|
|
|
-
|
|
|
- def _open(self, log):
|
|
|
- with open(log, 'a') as f:
|
|
|
- yield f
|
|
|
-
|
|
|
- def open(self, log):
|
|
|
- self._to_close = self._open(log)
|
|
|
- return next(self._to_close)
|
|
|
-
|
|
|
- def close(self):
|
|
|
- if self._to_close is not None:
|
|
|
- self._to_close.close()
|
|
|
- self._to_close = None
|
|
|
-
|
|
|
- def save(self, data):
|
|
|
- ts = data['ts']
|
|
|
- store = data['store']
|
|
|
- description = data['description']
|
|
|
- quantity = data['quantity']
|
|
|
- unit = data['unit']
|
|
|
- price = data['price']
|
|
|
- product = data['product']
|
|
|
- organic = data['organic'] if data['organic'] else 'false'
|
|
|
- statement = \
|
|
|
- f"CALL insert_transaction('{ts}', $store${store}$store$, " \
|
|
|
- f"$descr${description}$descr$, {quantity}, $unit${unit}$unit$, " \
|
|
|
- f"{price}, $produ${product}$produ$, {organic});\n"
|
|
|
- self.log.write(statement)
|
|
|
- self.cur.execute(statement)
|
|
|
-
|
|
|
-class FocusWidget(urwid.WidgetPlaceholder):
|
|
|
-
|
|
|
- def __init__(self, initial_focus, skip_focus):
|
|
|
- super().__init__(urwid.SolidFill(u'/'))
|
|
|
- self._initial_focus = tuple([ i for i in initial_focus ])
|
|
|
- self._skip_focus = tuple([ i for i in skip_focus ])
|
|
|
-
|
|
|
- @property
|
|
|
- def skip_focus(self):
|
|
|
- return self._skip_focus
|
|
|
-
|
|
|
- @property
|
|
|
- def initial_focus(self):
|
|
|
- return list(self._initial_focus)
|
|
|
-
|
|
|
- @property
|
|
|
- def container(self):
|
|
|
- return self.original_widget.original_widget.original_widget
|
|
|
-
|
|
|
- def _set_focus_path(self, path):
|
|
|
- try:
|
|
|
- self.container.set_focus_path(path)
|
|
|
- return
|
|
|
- except IndexError:
|
|
|
- pass
|
|
|
-
|
|
|
- if path[-1] == 0 and len(path) > 1:
|
|
|
- self._set_focus_path(path[:-1])
|
|
|
- return
|
|
|
-
|
|
|
- raise IndexError
|
|
|
-
|
|
|
- def iter_focus_paths(self):
|
|
|
- self._set_focus_path(self.initial_focus)
|
|
|
- while True:
|
|
|
- path = self.container.get_focus_path()
|
|
|
- yield path
|
|
|
- self.advance_focus()
|
|
|
- path = self.container.get_focus_path()
|
|
|
- if path == self.initial_focus:
|
|
|
- break
|
|
|
-
|
|
|
- def advance_focus(self, reverse=False):
|
|
|
-
|
|
|
- path = self.container.get_focus_path()
|
|
|
-
|
|
|
- if reverse:
|
|
|
- paths = [ i for i in self.iter_focus_paths() ]
|
|
|
- zipped_paths = zip(paths, [
|
|
|
- *paths[1:], paths[0]
|
|
|
- ])
|
|
|
- prev_path = map(lambda x: x[0], filter(
|
|
|
- lambda x: x[1] == path,
|
|
|
- zipped_paths
|
|
|
- ))
|
|
|
- p = next(prev_path)
|
|
|
- self._set_focus_path(p)
|
|
|
- return
|
|
|
-
|
|
|
- _iter = [ i for i in enumerate(path) ][::-1]
|
|
|
-
|
|
|
- for idx, part in _iter:
|
|
|
- p = [ i for i in path ]
|
|
|
- if reverse:
|
|
|
- p[idx] -= 1
|
|
|
- else:
|
|
|
- p[idx] += 1
|
|
|
-
|
|
|
- try:
|
|
|
- self._set_focus_path(p)
|
|
|
- if p in self.skip_focus:
|
|
|
- self.advance_focus(reverse=reverse)
|
|
|
- return
|
|
|
- except IndexError:
|
|
|
- path[idx] = 0
|
|
|
-
|
|
|
- self.container.set_focus_path(self.initial_focus)
|
|
|
-
|
|
|
-class TransactionEditor(FocusWidget):
|
|
|
-
|
|
|
- def keypress(self, size, key):
|
|
|
- if isinstance(key, tuple):
|
|
|
- return
|
|
|
-
|
|
|
- if getattr(self.original_widget.original_widget, 'original_widget', None) is None:
|
|
|
- return super().keypress(size, key)
|
|
|
-
|
|
|
- if key == 'tab':
|
|
|
- self.advance_focus()
|
|
|
- elif key == 'shift tab':
|
|
|
- self.advance_focus(reverse=True)
|
|
|
- else:
|
|
|
- return super().keypress(size, key)
|
|
|
-
|
|
|
- def _init_data(self, fields):
|
|
|
- self.data = OrderedDict([
|
|
|
- (k, '') for k in fields
|
|
|
- ])
|
|
|
- self.clear()
|
|
|
-
|
|
|
- def _apply_choice(self, name, value):
|
|
|
- self._apply_changes(name, value)
|
|
|
- for k,v in self.data.items():
|
|
|
- if k == name or v:
|
|
|
- continue
|
|
|
- options = self.query_manager.unique_suggestions(k, **self.data)
|
|
|
- if len(options) == 1 and k != 'ts':
|
|
|
- self._apply_changes(k, list(options)[0])
|
|
|
-
|
|
|
- def apply_choice(self, name):
|
|
|
- return lambda w,x: self._apply_choice(name, x)
|
|
|
-
|
|
|
- def _apply_changes(self, name, value):
|
|
|
- self.data.update({
|
|
|
- name: value,
|
|
|
- })
|
|
|
-
|
|
|
- def apply_changes(self, name):
|
|
|
- return lambda w,x: self._apply_changes(name, x)
|
|
|
-
|
|
|
- def apply_organic_state(self, w, state):
|
|
|
- self.data['organic'] = repr(state).lower()
|
|
|
-
|
|
|
- def clear(self):
|
|
|
- for k in self.data:
|
|
|
- if k in ('ts', 'store',):
|
|
|
- continue
|
|
|
- self.data[k] = ''
|
|
|
- self.organic_checkbox.set_state(False)
|
|
|
-
|
|
|
- def update(self):
|
|
|
- for k in self.edit_fields:
|
|
|
- self.edit_fields[k].set_edit_text(self.data[k])
|
|
|
- date, store = self.data['ts'], self.data['store']
|
|
|
- self.text_fields['dbview'].set_text(
|
|
|
- self.query_manager.get_session_transactions(date, store) if None not in (
|
|
|
- date or None, store or None
|
|
|
- ) else ''
|
|
|
- )
|
|
|
- self.organic_checkbox.set_state(True if self.data['organic'] == 'true' else False)
|
|
|
- return self
|
|
|
-
|
|
|
- def __init__(self, query_manager, fields,
|
|
|
- layout, side_pane, bottom_pane,
|
|
|
- save_and_clear_cb, autocomplete_cb):
|
|
|
- super().__init__([2,0,0,0], [
|
|
|
- [4,],
|
|
|
- [5,],
|
|
|
- [7,],
|
|
|
- ])
|
|
|
- self.organic_checkbox = NoTabCheckBox(
|
|
|
- u"Organic",
|
|
|
- on_state_change=self.apply_organic_state
|
|
|
- )
|
|
|
- self.query_manager = query_manager
|
|
|
- self._init_data(fields)
|
|
|
- self.layout = layout
|
|
|
- self.side_pane = side_pane
|
|
|
- self.bottom_pane = bottom_pane
|
|
|
- self.edit_fields = OrderedDict()
|
|
|
- self.text_fields = OrderedDict()
|
|
|
- for k in self.data:
|
|
|
- if k in self.side_pane and k != 'unit':
|
|
|
- ef = AutoCompleteFloatEdit(('bg', k), apply_change_func=lambda name: autocomplete_cb(name, self.data))
|
|
|
- elif k != 'organic':
|
|
|
- ef = AutoCompleteEdit(('bg', k), apply_change_func=lambda name: autocomplete_cb(name, self.data))
|
|
|
- else:
|
|
|
- continue
|
|
|
- ef.set_edit_text(self.data[k])
|
|
|
- urwid.connect_signal(ef, 'change', self.apply_changes(k))
|
|
|
- self.edit_fields[k] = ef
|
|
|
-
|
|
|
- header = urwid.Text(u'Fill Transaction', 'center')
|
|
|
- _copyright = urwid.Text(COPYRIGHT, 'center')
|
|
|
- done_button = urwid.Button(('streak', u'Done'))
|
|
|
- urwid.connect_signal(done_button, 'click', lambda w: save_and_clear_cb())
|
|
|
- banner = urwid.Pile([
|
|
|
- urwid.Padding(header, 'center', width=('relative', 100)),
|
|
|
- urwid.Padding(_copyright, 'center', width=('relative', 100)),
|
|
|
- ])
|
|
|
- banner = urwid.AttrMap(banner, 'banner')
|
|
|
- fields = dict([
|
|
|
- (k, urwid.LineBox(urwid.AttrMap(self.edit_fields[k], 'streak'), title=k.title(), title_align='left')) for k in self.edit_fields
|
|
|
- ])
|
|
|
- txn_view = urwid.Text('')
|
|
|
- self.text_fields.update({'dbview': txn_view})
|
|
|
- fields.update({
|
|
|
- 'dbview': urwid.LineBox(
|
|
|
- urwid.AttrMap(txn_view, 'streak'),
|
|
|
- title="Session Data",
|
|
|
- title_align='left',
|
|
|
- )
|
|
|
- })
|
|
|
-
|
|
|
- side_pane_widget = (12, urwid.Pile([
|
|
|
- fields[r] if r is not None else urwid.Divider() for r in self.side_pane
|
|
|
- ]))
|
|
|
- main_pane_widgets = []
|
|
|
- for i, r in enumerate(self.layout):
|
|
|
- widgets = []
|
|
|
- for c in r:
|
|
|
- if c is not None:
|
|
|
- if c != 'organic':
|
|
|
- widgets.append(fields[c])
|
|
|
- else:
|
|
|
- widgets.append(urwid.LineBox(urwid.AttrMap(self.organic_checkbox, 'bg')))
|
|
|
- else:
|
|
|
- widgets.append(urwid.Divider())
|
|
|
- main_pane_widgets.append(urwid.Columns(widgets))
|
|
|
-
|
|
|
-
|
|
|
- main_pane_widget = urwid.Pile(main_pane_widgets)
|
|
|
-
|
|
|
- widget = urwid.Pile([
|
|
|
- banner,
|
|
|
- urwid.Divider(),
|
|
|
- urwid.Columns((main_pane_widget, side_pane_widget),
|
|
|
- dividechars=2,
|
|
|
- ),
|
|
|
- *[ fields[c] if c is not None else urwid.Divider() for c in self.bottom_pane ],
|
|
|
- urwid.Divider(),
|
|
|
- done_button,
|
|
|
- ])
|
|
|
- widget = urwid.Filler(widget, 'top')
|
|
|
- widget = urwid.AttrMap(widget, 'bg')
|
|
|
- self.original_widget = widget
|
|
|
-
|
|
|
-class SuggestionPopup(urwid.Overlay):
|
|
|
-
|
|
|
- def __init__(self, under, name, options, apply_cb, esc_cb):
|
|
|
- self.esc_cb = esc_cb
|
|
|
- self.under = under
|
|
|
- body = [urwid.Text(name.title()), urwid.Divider()]
|
|
|
- for c in options:
|
|
|
- button = urwid.Button(c)
|
|
|
- urwid.connect_signal(button, 'click', apply_cb, c)
|
|
|
- body.append(urwid.AttrMap(button, None, focus_map='reversed'))
|
|
|
- walker = urwid.SimpleFocusListWalker(body, wrap_around=False)
|
|
|
- listbox = urwid.ListBox(walker)
|
|
|
- pad = urwid.Padding(listbox, left=2, right=2)
|
|
|
- pad = urwid.AttrMap(pad, 'banner')
|
|
|
- super().__init__(pad, under,
|
|
|
- align='center', width=('relative', 60),
|
|
|
- valign='middle', height=('relative', 60),
|
|
|
- min_width=20, min_height=9)
|
|
|
-
|
|
|
- def keypress(self, size, key):
|
|
|
- if key == 'esc':
|
|
|
- self.esc_cb()
|
|
|
- return
|
|
|
-
|
|
|
- if key == 'tab':
|
|
|
- return
|
|
|
-
|
|
|
- return super().keypress(size, key)
|
|
|
-
|
|
|
-class ActivityManager(object):
|
|
|
-
|
|
|
- def __init__(self):
|
|
|
- self.widgets = dict()
|
|
|
- self.app = None
|
|
|
-
|
|
|
- def add(self, widget, name):
|
|
|
- self.widgets[name] = widget
|
|
|
- return widget
|
|
|
-
|
|
|
- def get(self, name):
|
|
|
- if name in self.widgets:
|
|
|
- return self.widgets[name]
|
|
|
- raise Exception("Widget {name} not found")
|
|
|
-
|
|
|
- def create(self, cls, name, *args, **kwargs):
|
|
|
- widget = cls(*args, **kwargs)
|
|
|
- if name is not None:
|
|
|
- self.add(widget, name)
|
|
|
- return widget
|
|
|
-
|
|
|
- def show(self, widget):
|
|
|
- if self.app is not None:
|
|
|
- self.app.original_widget = widget
|
|
|
- return
|
|
|
- self.app = widget
|