Browse Source

Refactor and ensure mock mode still works

Daniel Sheffield 3 years ago
parent
commit
ba5c2ec194
4 changed files with 65 additions and 31 deletions
  1. 30 22
      grocery_transactions.py
  2. 6 0
      mock.py
  3. 4 1
      txn_view.py
  4. 25 8
      widgets.py

+ 30 - 22
grocery_transactions.py

@@ -5,12 +5,11 @@
 # All rights reserved
 #
 # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
-import pandas as pd
+import time
 import itertools
 import sys
 import urwid
-import time
-from txn_view import get_statement as get_session_transactions_statement
+import pandas as pd
 from dateutil.parser import parse as parse_time
 from widgets import (
     NoTabCheckBox,
@@ -30,14 +29,19 @@ except:
     password = ''
 
 try:
+    from txn_view import (
+        get_transactions_statement,
+        get_session_transactions_statement,
+    )
+    from db_utils import cursor_as_dict
     import psycopg2
     from psycopg2.sql import SQL
-    from db_utils import cursor_as_dict
     import os
     user = os.getenv('USER')
     conn = psycopg2.connect(f"{host} dbname=das user={user} {password}")
     cur = conn.cursor()
 except:
+    print('Failed to set up db connection. Entering Mock mode')
     from mock import *
 
 palette = [
@@ -87,14 +91,11 @@ display_map = {
 }
 display = lambda data, name: display_map[name](data) if name in display_map else data
 
-cur.execute("BEGIN")
-
-def get_session_transactions(date, store):
-    statement = get_session_transactions_statement(parse_time(date), store, full_name=True, exact_time=True)
+def get_session_transactions(cursor, statement, date, store):
     #print(cur.mogrify(statement).decode("utf-8"))
     #input()
-    cur.execute(statement)
-    df = pd.DataFrame(cursor_as_dict(cur))
+    cursor.execute(statement)
+    df = pd.DataFrame(cursor_as_dict(cursor))
     if df.empty:
         return ''
     return df.drop(labels=[
@@ -102,16 +103,14 @@ def get_session_transactions(date, store):
     ], axis=1).to_string(header=[
         'Description', 'Volume', 'Unit', 'Price', '$/unit', 'Total',
         'Group', 'Category', 'Product', 'Organic',
-    ], justify='justify-all', max_colwidth=38, index=False)
+    ], justify='justify-all', max_colwidth=60, index=False)
 
-def get_transactions_statement():
-    return SQL("SELECT * FROM transaction_view")
 
-def get_transactions(cursor):
-    cur.execute(get_transactions_statement())
+def get_transactions(cursor, statement):
+    cursor.execute(statement)
     yield from  map(lambda x: dict([
         (k, display(v, k)) for k,v in x.items()
-    ]), cursor_as_dict(cur))
+    ]), cursor_as_dict(cursor))
 
 def record_matches(record, strict=None, **kwargs):
     strict = strict or []
@@ -127,16 +126,16 @@ def record_matches(record, strict=None, **kwargs):
         
     return True
 
-def unique_suggestions(name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
+def unique_suggestions(cur, statement, name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
     exclude = filter(
         lambda x: x != name or name == 'ts',
         exclude,
     )
     [ kwargs.pop(k) for k in exclude if k in kwargs]
-    items = suggestions(name, exclude=exclude, **kwargs)
+    items = suggestions(cur, statement, name, exclude=exclude, **kwargs)
     return sorted(set(map(lambda x: x[name], items)))
 
-def suggestions(name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
+def suggestions(cur, statement, name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
     exclude = filter(
         lambda x: x != name or name == 'ts',
         exclude,
@@ -144,7 +143,7 @@ def suggestions(name, exclude=NON_IDENTIFIER_COLUMNS, **kwargs):
     [ kwargs.pop(k) for k in exclude if k in kwargs]
     yield from filter(lambda x: record_matches(
         x, strict=[ k for k in kwargs if k != name ], **kwargs
-    ), get_transactions(cur))
+    ), get_transactions(cur, statement))
 
 def show_or_exit(key):
     if isinstance(key, tuple):
@@ -164,9 +163,18 @@ def interleave(_list, div):
 
 args = sys.argv
 log = args[1]
-app = GroceryTransactionEditor(cur, log,
-    cols, grid_layout,
+
+cur.execute("BEGIN")
+
+app = GroceryTransactionEditor(cur,
+    lambda name, **kwargs: unique_suggestions(cur, get_transactions_statement(), name, **kwargs),
+    lambda date, store, *x, **y: get_session_transactions(
+        cur, get_session_transactions_statement(
+            parse_time(date), store, full_name=True, exact_time=True
+        ), date, store),
+    log, cols, grid_layout,
     side_pane, bottom_pane)
+
 loop = urwid.MainLoop(app, palette, unhandled_input=show_or_exit)
 loop.run()
 

+ 6 - 0
mock.py

@@ -28,7 +28,13 @@ class mock_cur(object):
 
 def cursor_as_dict(cur):
     yield from cur.fetchall()
+
+def get_transactions_statement():
+    return ''
     
+def get_session_transactions_statement(date, store, full_name=True, exact_time=True):
+    return ''
+
 Faker.seed(4321)
 fake = Faker()
 # first, import a similar Provider or use the default one

+ 4 - 1
txn_view.py

@@ -15,6 +15,9 @@ from collections import (
     OrderedDict,
 )
 
+def get_transactions_statement():
+    return SQL("SELECT * FROM transaction_view")
+
 select = OrderedDict([
     ('id', Identifier('transactions','id')),
     ('ts', SQL("""date_part('day',ts)||'/'||date_part('month',ts)||' '||date_part('hour',ts)::int%12""")),
@@ -56,7 +59,7 @@ def get_where(date, store, full_name=False, exact_time=False):
         SQL("\n  AND ").join(where),
     ])
 
-def get_statement(date, store, full_name=False, exact_time=False):
+def get_session_transactions_statement(date, store, full_name=False, exact_time=False):
     statement = SQL('\n').join([
         SQL('').join([
             SQL("SELECT"

+ 25 - 8
widgets.py

@@ -137,7 +137,8 @@ def _set_focus_path(container, path):
     raise IndexError
 
 class GroceryTransactionEditor(urwid.WidgetPlaceholder):
-    def __init__(self, cur, log, fields, layout, side_pane, bottom_pane):
+    def __init__(self, cur, unique_suggestions, get_session_transactions,
+        log, fields, layout, side_pane, bottom_pane):
         super(GroceryTransactionEditor, self).__init__(urwid.SolidFill(u'/'))
         self.organic_checkbox = NoTabCheckBox(
             u"Organic",
@@ -150,12 +151,29 @@ class GroceryTransactionEditor(urwid.WidgetPlaceholder):
         self.bottom_pane = bottom_pane
         self.show('transaction')
         self.cur = cur
+        self.unique_suggestions = unique_suggestions
+        self.get_session_transactions = get_session_transactions
         
         with open(log, 'r') as f:
+            date = None
+            store = None
             for line in f.readlines():
-                #print(cur.mogrify(line))
+                if date is None and store is None:
+                    if '$store$' in line:
+                        date, store, _= line.split('$store$')
+                        date = date.split("'")[1]
+                else:
+                    assert None not in (date, store,), \
+                        "Both date and store should be set or neither should be set"
+                    if '$store$' in line:
+                        assert date in line and f'$store${store}$store$' in line, \
+                            "Date ({date}) and store ({store}) not found in {line}."\
+                            " Mixing transactions from different dates and stores is not supported"
+                #print(self.cur.mogrify(line))
                 #input()
                 self.cur.execute(line)
+        self._apply_choice('ts', date)
+        self._apply_choice('store', store)
 
         self.log = self.open(log)
         
@@ -252,7 +270,7 @@ class GroceryTransactionEditor(urwid.WidgetPlaceholder):
         for k,v in self.data.items():
             if k == name or v:
                 continue
-            options = unique_suggestions(k, **self.data)
+            options = self.unique_suggestions(k, **self.data)
             if len(options) == 1 and k != 'ts':
                 self.data.update({
                     k: list(options)[0],
@@ -260,8 +278,7 @@ class GroceryTransactionEditor(urwid.WidgetPlaceholder):
         self.show('transaction')
     
     def apply_choice(self, name):
-        apply = lambda w,x: self._apply_choice(name, x)
-        return apply
+        return  lambda w,x: self._apply_choice(name, x)
 
     def _apply_changes(self, name, value):
         self.data.update({
@@ -269,7 +286,7 @@ class GroceryTransactionEditor(urwid.WidgetPlaceholder):
         })
 
     def _autocomplete(self, name):
-        options = unique_suggestions(name, **self.data)
+        options = self.unique_suggestions(name, **self.data)
         if 0 < len(options):
             self.show('suggestions', name, options=options)
     
@@ -333,7 +350,7 @@ class GroceryTransactionEditor(urwid.WidgetPlaceholder):
             f"$descr${description}$descr$, {quantity}, $unit${unit}$unit$, " \
             f"{price}, $produ${product}$produ$, {organic});\n"
         self.log.write(statement)
-        cur.execute(statement)
+        self.cur.execute(statement)
 
     def clear(self):
         for k in self.data:
@@ -355,7 +372,7 @@ class GroceryTransactionEditor(urwid.WidgetPlaceholder):
             self.edit_fields[k].set_edit_text(self.data[k])
         date, store = self.data['ts'], self.data['store']
         self.text_fields['dbview'].set_text(
-            get_session_transactions(date, store) if None not in (
+            self.get_session_transactions(date, store) if None not in (
                 date or None, store or None
             ) else ''
         )