123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from datetime import datetime, timedelta
- from dateutil.parser import parse as parse_time
- import itertools
- import gnucash
- import sys
- import os
- import psycopg
- from app.data.QueryManager import cursor_as_dict
- from app.data.TransactionView import get_session_transactions_statement as get_statement
- try:
- from db_credentials import HOST, PASSWORD
- host = f'host={HOST}'
- password = f'password={PASSWORD}'
- except:
- host = ''
- password = ''
- STORE_CODES = {
- 'countdown': 'CD',
- 'pak n save': 'PnS',
- 'SEAFOOD BAZAAR': 'SB',
- 'THE ORGANIC FOOD SHOP HAMILTON': 'TOFS',
- 'WHATAWHATA BERRY FARM': 'Farm',
- 'TAUPIRI DAIRY TAUPIRI': 'TD',
- 'THE FARM SHOP': 'GFS',
- 'Dreamview Cre': 'DV',
- 'new world': 'NW',
- 'BIN INN': 'BI',
- 'NEW SAVE': 'NS',
- 'ORGANIC NATION' : 'ON',
- 'The Warehouse' : 'WH',
- 'Goodearth' : 'CO',
- 'REDUCED' : 'RTC',
- }
- user = os.getenv('USER')
- conn = psycopg.connect(f"{host} dbname=grocery user={user} {password}")
- cur = conn.cursor()
- output = []
- def get_record_from_database(date, store):
- cur.execute(get_statement(date, store))
-
- return sum([row['price'] for row in cursor_as_dict(cur)])
- def get_store_code(store, value, user_data=dict()):
- for k, v in STORE_CODES.items():
- if k.lower() in store.lower():
- return v
- if store not in user_data:
- user_data[store] = input(f"Enter code for {store} ({int(value)/100:>6.2f}): ")
- return user_data[store]
- def _unwrap_list(root, blacklist):
- for i in map(lambda x: [
- (':'.join([x[0], c.name]), True) for c in root.get_children()
- ] if x[1] else [ x ], blacklist):
- for j in i: yield j
- def _accounts(root, path, blacklist, whitelist):
-
-
- if root.name in [ w[0] for w in whitelist ]:
- if root.name not in [ b[0] for b in blacklist ]:
- yield path, root
- blacklist = [b for b in itertools.chain(
- _unwrap_list(root, filter(lambda x: (x[0] == root.name), blacklist)),
- filter(lambda x: not (x[0] == root.name), blacklist)
- )]
- whitelist = [b for b in itertools.chain(
- _unwrap_list(root, filter(lambda x: (x[0] == root.name), whitelist)),
- filter(lambda x: not (x[0] == root.name), whitelist)
- )]
- path = [*path, root.name]
- prefix = '{}:'.format(root.name)
-
-
-
- blacklist = [
- b for b in map(
- lambda x: (x[0][len(prefix):], x[1]), filter(
- lambda x: x[0].startswith(prefix), blacklist
- )
- )
- ]
- whitelist = [
- b for b in map(
- lambda x: (x[0][len(prefix):], x[1]), filter(
- lambda x: x[0].startswith(prefix), whitelist
- )
- )
- ]
-
- for a in root.get_children():
- yield from _accounts(a, path, blacklist, whitelist)
- def _path(account):
- if account.get_parent() is not None: return ':'.join([_path(account.get_parent()), account.name])
- else: return account.name
- def _iter(root, blacklist, whitelist, _from, _to):
- for p, a in _accounts(root, [], blacklist, whitelist):
- apath = _path(a)
- for t in a.GetSplitList():
- tpath = _path(t.account)
- ts = t.parent.GetDate()
- if datetime.fromtimestamp(_to) > ts >= datetime.fromtimestamp(_from):
-
-
-
-
- if tpath == apath:
-
- ts = ts.replace(hour=0, minute=0)
- yield p, a.name, t.GetValue().num(), t.parent.GetDescription(), ts
- if __name__ == '__main__':
- args = sys.argv
- session = gnucash.Session(args[1])
- try:
- blacklist = [
-
- ('Root Account:Assets', True),
- ('Root Account:Liabilities', True),
- ('Root Account:Equity', True),
- ('Root Account:Income', True),
- ]
- whitelist = [
-
- ('Root Account:Expenses:NZD:Food:Groceries', True),
- ]
- _from = int(parse_time(args[2]).timestamp())
- _to = int(parse_time(args[3]).timestamp())
- _interval = int(args[4])*60*60*24
- ranges = dict([
- [
- el for el in map(lambda x: datetime.fromtimestamp(x), (i, i+_interval))
- ] for i in range(_from, _to, _interval)
- ])
- tot = dict()
- for p,n,value,desc,d in sorted(_iter(session.book.get_root_account(), blacklist, whitelist, _from, _to), key=lambda x: x[0]):
- account_name = ':'.join([*p, n])
- f = next(( f for (f, t) in ranges.items() if f <= d < t ))
- tot.update({
- f: value if f not in tot else value+tot[f]
- })
- ts = d + timedelta(days=1)
- store = get_store_code(desc, value)
- book = None
- while d + timedelta(days=2) > ts > d - timedelta(days=3):
- book = get_record_from_database(ts, store)
- book = int(book*100)
- if book == int(value):
- break;
- ts = ts - timedelta(days=1)
- book = get_record_from_database(ts, store)
- dbentry = f'{ts} {store:5s}' if book == int(value) else ''
- output.append(f'{d} {desc:35s} | {value/100:>6.2f} | {dbentry}')
- finally:
- session.end()
- for line in output:
- print(line)
|