123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- from collections import (
- OrderedDict,
- )
- from psycopg.sql import (
- Composable,
- Identifier,
- SQL,
- Literal,
- )
- from .util import get_select, get_from, get_groupby
- ALIAS_TO_TABLE = {
- 'product': 'products',
- 'category': 'categories',
- 'group': 'groups',
- 'tags': 'tags',
- 'unit': 'units',
- 'store': 'stores',
- }
- NON_IDENTIFIER_COLUMNS = [
- 'ts',
- 'store',
- 'quantity',
- 'unit',
- 'price',
- 'organic',
- 'tags',
- ]
- def get_table_statement(alias):
- tables = ALIAS_TO_TABLE
- return SQL("SELECT {column} AS {alias} FROM {table}").format(
- column=Identifier(tables[alias], 'name'),
- table=Identifier(tables[alias]),
- alias=Identifier(alias),
- )
- def get_transactions_statement(name, **kwargs):
- where = []
- for k,v in kwargs.items():
- where.append(SQL('{key} {condition} {value}').format(
- key=Identifier(ALIAS_TO_TABLE[k], 'name'),
- condition=SQL('ILIKE' if k == name else '='),
- value=Literal(f'%{v}%' if k == name else v),
- ) if k in ALIAS_TO_TABLE and (k not in NON_IDENTIFIER_COLUMNS or k == name) and v else SQL('TRUE'))
- statement = SQL('\n').join([
- get_select(dict([('tags', Identifier('tags','name')), *[
- (k,v) for k,v in SELECT.items() if k != 'tags'
- ]])),
- get_from("transactions", JOINS),
- SQL('').join([SQL('WHERE '), SQL('\n AND ').join(where)]),
- ])
- return statement
- SELECT = OrderedDict([
- ('id', Identifier('transactions', 'id')),
- ('ts', Identifier('transactions', 'ts')),
- ('store', Identifier('stores', 'name')),
- ('code', Identifier('stores', 'code')),
- ('description', Identifier('transactions', 'description')),
- ('volume', Identifier('quantity')),
- ('unit', Identifier('units', 'name')),
- ('price', Identifier('price')),
- ('quantity', Identifier('quantity')),
- ('$/unit', SQL("""TRUNC(price/quantity,4)""")),
- ('total', SQL("""sum(transactions.price)
- OVER (
- PARTITION BY transactions.ts::date
- ORDER BY transactions.id
- ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
- )""")),
- ('group', Identifier('groups', 'name')),
- ('category', Identifier('categories', 'name')),
- ('product', Identifier('products', 'name')),
- ('organic', Identifier('organic')),
- ('tags', SQL(
- """array_agg({tag_name}) FILTER (WHERE {tag_name} IS NOT NULL)"""
- ).format(
- tag_name=Identifier('tags','name'),
- ))
- ])
- GROUPBY = OrderedDict([
- ('id', Identifier('transactions', 'id')),
- ('ts', Identifier('transactions', 'ts')),
- ('store', Identifier('stores', 'name')),
- ('code', Identifier('stores', 'code')),
- ('description', Identifier('transactions', 'description')),
- ('volume', Identifier('quantity')),
- ('unit', Identifier('units', 'name')),
- ('price', Identifier('price')),
- ('quantity', Identifier('quantity')),
- ('group', Identifier('groups', 'name')),
- ('category', Identifier('categories', 'name')),
- ('product', Identifier('products', 'name')),
- ('organic', Identifier('organic')),
- ])
- JOINS = OrderedDict([
- ('units', ('id', 'unit_id')),
- ('stores', ('id', 'store_id')),
- ('products', ('id', 'product_id')),
- ('categories', ('id', 'category_id')),
- ('groups', ('id', 'group_id')),
- ('tags_map', ('transaction_id', ('transactions','id'))),
- ('tags', ('id', ('tags_map','tag_id'))),
- ])
- def get_where(date, store, full_name=False, exact_time=False):
- where = []
- if store is not None:
- where.append(SQL(' ').join([
- Identifier('stores', 'name' if full_name else 'code'),
- SQL('='),
- Literal(store)
- ]))
- where.append(
- SQL("{ts} at time zone 'utc' BETWEEN {date}::date AND {date}::date + {interval}::interval").format(
- ts=Identifier('ts'),
- date=Literal(str(date)),
- interval=Literal('23 hours 59 minutes 59 seconds'),
- ) if not exact_time else SQL("{ts} at time zone 'utc' = {date}").format(
- ts=Identifier('ts'),
- date=Literal(str(date)),
- )
- )
- return SQL('').join([
- SQL("WHERE"
- "\n "),
- SQL("\n AND ").join(where),
- ])
- def get_sort() -> Composable:
- return SQL('ORDER BY {_id} DESC').format(
- _id=Identifier('transactions', 'id')
- )
- def get_session_transactions_statement(date, store, full_name=False, exact_time=False):
- statement = SQL('\n').join([
- get_select(SELECT),
- get_from("transactions", JOINS),
- get_where(
- date if exact_time else date.replace(
- hour=0, minute=0, second=0, microsecond=0
- ),
- store,
- full_name=full_name,
- exact_time=exact_time
- ),
- get_groupby(GROUPBY),
- get_sort(),
- ])
- return statement
|