#!/usr/bin/python3 # # Copyright (c) Daniel Sheffield 2021 - 2023 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY # # usage: ./reconcile.py ~/gnucash/merged.gnucash 2021-11-01 2021-12-10 1 # from datetime import datetime, timedelta from dateutil.parser import parse as parse_time import itertools import gnucash import sys import os import psycopg from app.data.QueryManager import cursor_as_dict from app.data.TransactionView import get_session_transactions_statement as get_statement try: from db_credentials import HOST, PASSWORD host = f'host={HOST}' password = f'password={PASSWORD}' except: host = '' password = '' STORE_CODES = { 'countdown': 'CD', 'pak n save': 'PnS', 'SEAFOOD BAZAAR': 'SB', 'THE ORGANIC FOOD SHOP HAMILTON': 'TOFS', 'WHATAWHATA BERRY FARM': 'Farm', 'TAUPIRI DAIRY TAUPIRI': 'TD', 'THE FARM SHOP': 'GFS', 'Dreamview Cre': 'DV', 'new world': 'NW', 'BIN INN': 'BI', 'NEW SAVE': 'NS', 'ORGANIC NATION' : 'ON', 'The Warehouse' : 'WH', 'Goodearth' : 'CO', 'REDUCED' : 'RTC', } user = os.getenv('USER') conn = psycopg.connect(f"{host} dbname=grocery user={user} {password}") cur = conn.cursor() output = [] def get_record_from_database(date, store): cur.execute(get_statement(date, store)) #print(cur.mogrify(get_statement(date, store))) return sum([row['price'] for row in cursor_as_dict(cur)]) def get_store_code(store, value, user_data=dict()): for k, v in STORE_CODES.items(): if k.lower() in store.lower(): return v if store not in user_data: user_data[store] = input(f"Enter code for {store} ({int(value)/100:>6.2f}): ") return user_data[store] def _unwrap_list(root, blacklist): for i in map(lambda x: [ (':'.join([x[0], c.name]), True) for c in root.get_children() ] if x[1] else [ x ], blacklist): for j in i: yield j def _accounts(root, path, blacklist, whitelist): #print('blacklist1: {}'.format(blacklist)) #print(f'whitelist1 ({root.name}: {whitelist}') if root.name in [ w[0] for w in whitelist ]: if root.name not in [ b[0] for b in blacklist ]: yield path, root blacklist = [b for b in itertools.chain( _unwrap_list(root, filter(lambda x: (x[0] == root.name), blacklist)), filter(lambda x: not (x[0] == root.name), blacklist) )] whitelist = [b for b in itertools.chain( _unwrap_list(root, filter(lambda x: (x[0] == root.name), whitelist)), filter(lambda x: not (x[0] == root.name), whitelist) )] path = [*path, root.name] prefix = '{}:'.format(root.name) #print('root: {}'.format(root.name)) #print('prefix: {}'.format(prefix)) #print('path: {}'.format(':'.join(path))) blacklist = [ b for b in map( lambda x: (x[0][len(prefix):], x[1]), filter( lambda x: x[0].startswith(prefix), blacklist ) ) ] whitelist = [ b for b in map( lambda x: (x[0][len(prefix):], x[1]), filter( lambda x: x[0].startswith(prefix), whitelist ) ) ] #print('blacklist: {}'.format(blacklist)) for a in root.get_children(): yield from _accounts(a, path, blacklist, whitelist) def _path(account): if account.get_parent() is not None: return ':'.join([_path(account.get_parent()), account.name]) else: return account.name def _iter(root, blacklist, whitelist, _from, _to): for p, a in _accounts(root, [], blacklist, whitelist): apath = _path(a) for t in a.GetSplitList(): tpath = _path(t.account) ts = t.parent.GetDate() if datetime.fromtimestamp(_to) > ts >= datetime.fromtimestamp(_from): #if apath.startswith('Root Account:Trading'): # if tpath.startswith('Root Account:Trading:CURRENCY'): # s+=t.GetValue().num() #elif tpath == apath: if tpath == apath: #print(t.parent.GetDescription()) ts = ts.replace(hour=0, minute=0) yield p, a.name, t.GetValue().num(), t.parent.GetDescription(), ts if __name__ == '__main__': args = sys.argv session = gnucash.Session(args[1]) try: blacklist = [ # name, also_children ('Root Account:Assets', True), ('Root Account:Liabilities', True), ('Root Account:Equity', True), ('Root Account:Income', True), ] whitelist = [ # name, also_children ('Root Account:Expenses:NZD:Food:Groceries', True), ] _from = int(parse_time(args[2]).timestamp())# 1554030000 # 1 April 2019 as seconds since epoch _to = int(parse_time(args[3]).timestamp()) _interval = int(args[4])*60*60*24 ranges = dict([ [ el for el in map(lambda x: datetime.fromtimestamp(x), (i, i+_interval)) ] for i in range(_from, _to, _interval) ]) tot = dict() for p,n,value,desc,d in sorted(_iter(session.book.get_root_account(), blacklist, whitelist, _from, _to), key=lambda x: x[0]): account_name = ':'.join([*p, n]) f = next(( f for (f, t) in ranges.items() if f <= d < t )) tot.update({ f: value if f not in tot else value+tot[f] }) ts = d + timedelta(days=1) store = get_store_code(desc, value) book = None while d + timedelta(days=2) > ts > d - timedelta(days=3): book = get_record_from_database(ts, store) book = int(book*100) if book == int(value): break; ts = ts - timedelta(days=1) book = get_record_from_database(ts, store) dbentry = f'{ts} {store:5s}' if book == int(value) else '' output.append(f'{d} {desc:35s} | {value/100:>6.2f} | {dbentry}') finally: session.end() for line in output: print(line)