# # Copyright (c) Daniel Sheffield 2021 - 2023 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from collections import ( OrderedDict, ) from psycopg.sql import ( Composable, Identifier, SQL, Literal, ) from .util import get_select, get_from, get_groupby ALIAS_TO_TABLE = { 'product': 'products', 'category': 'categories', 'group': 'groups', 'tags': 'tags', 'unit': 'units', 'store': 'stores', } NON_IDENTIFIER_COLUMNS = [ 'ts', 'store', 'quantity', 'unit', 'price', 'organic', 'tags', ] def get_table_statement(alias): tables = ALIAS_TO_TABLE return SQL("SELECT {column} AS {alias} FROM {table}").format( column=Identifier(tables[alias], 'name'), table=Identifier(tables[alias]), alias=Identifier(alias), ) def get_transactions_statement(name, **kwargs): where = [] for k,v in kwargs.items(): where.append(SQL('{key} {condition} {value}').format( key=Identifier(ALIAS_TO_TABLE[k], 'name'), condition=SQL('ILIKE' if k == name else '='), value=Literal(f'%{v}%' if k == name else v), ) if k in ALIAS_TO_TABLE and (k not in NON_IDENTIFIER_COLUMNS or k == name) and v else SQL('TRUE')) statement = SQL('\n').join([ get_select(dict([('tags', Identifier('tags','name')), *[ (k,v) for k,v in SELECT.items() if k != 'tags' ]])), get_from("transactions", JOINS), SQL('').join([SQL('WHERE '), SQL('\n AND ').join(where)]), ]) return statement SELECT = OrderedDict([ ('id', Identifier('transactions', 'id')), ('ts', Identifier('transactions', 'ts')), ('store', Identifier('stores', 'name')), ('code', Identifier('stores', 'code')), ('description', Identifier('transactions', 'description')), ('volume', Identifier('quantity')), ('unit', Identifier('units', 'name')), ('price', Identifier('price')), ('quantity', Identifier('quantity')), ('$/unit', SQL("""TRUNC(price/quantity,4)""")), ('total', SQL("""sum(transactions.price) OVER ( PARTITION BY transactions.ts::date ORDER BY transactions.id ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW )""")), ('group', Identifier('groups', 'name')), ('category', Identifier('categories', 'name')), ('product', Identifier('products', 'name')), ('organic', Identifier('organic')), ('tags', SQL( """array_agg({tag_name}) FILTER (WHERE {tag_name} IS NOT NULL)""" ).format( tag_name=Identifier('tags','name'), )) ]) GROUPBY = OrderedDict([ ('id', Identifier('transactions', 'id')), ('ts', Identifier('transactions', 'ts')), ('store', Identifier('stores', 'name')), ('code', Identifier('stores', 'code')), ('description', Identifier('transactions', 'description')), ('volume', Identifier('quantity')), ('unit', Identifier('units', 'name')), ('price', Identifier('price')), ('quantity', Identifier('quantity')), ('group', Identifier('groups', 'name')), ('category', Identifier('categories', 'name')), ('product', Identifier('products', 'name')), ('organic', Identifier('organic')), ]) JOINS = OrderedDict([ ('units', ('id', 'unit_id')), ('stores', ('id', 'store_id')), ('products', ('id', 'product_id')), ('categories', ('id', 'category_id')), ('groups', ('id', 'group_id')), ('tags_map', ('transaction_id', ('transactions','id'))), ('tags', ('id', ('tags_map','tag_id'))), ]) def get_where(date, store, full_name=False, exact_time=False): where = [] if store is not None: where.append(SQL(' ').join([ Identifier('stores', 'name' if full_name else 'code'), SQL('='), Literal(store) ])) where.append( SQL("{ts} at time zone 'utc' BETWEEN {date}::date AND {date}::date + {interval}::interval").format( ts=Identifier('ts'), date=Literal(str(date)), interval=Literal('23 hours 59 minutes 59 seconds'), ) if not exact_time else SQL("{ts} at time zone 'utc' = {date}").format( ts=Identifier('ts'), date=Literal(str(date)), ) ) return SQL('').join([ SQL("WHERE" "\n "), SQL("\n AND ").join(where), ]) def get_sort() -> Composable: return SQL('ORDER BY {_id} DESC').format( _id=Identifier('transactions', 'id') ) def get_session_transactions_statement(date, store, full_name=False, exact_time=False): statement = SQL('\n').join([ get_select(SELECT), get_from("transactions", JOINS), get_where( date if exact_time else date.replace( hour=0, minute=0, second=0, microsecond=0 ), store, full_name=full_name, exact_time=exact_time ), get_groupby(GROUPBY), get_sort(), ]) return statement