123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- from psycopg import Cursor
- from urwid import raw_display, WidgetPlaceholder, SolidFill, MainLoop
- from app.activities.ActivityManager import ActivityManager, show_or_exit
- from app.activities.RecipeEditor import RecipeEditor
- from app.data.QueryManager import QueryManager, display_mapper
- from app.palette import high_contrast
- from app import parse_recipe
- import pandas as pd
- import sys
- try:
- from db_credentials import HOST, PASSWORD
- host = f'host={HOST}'
- password = f'password={PASSWORD}'
- except:
- host = ''
- password = ''
- try:
- import os
- import psycopg
- user = os.getenv('USER')
- conn = psycopg.connect(f"{host} dbname=grocery user={user} {password}")
- cur: Cursor = conn.cursor()
- except:
- print('Failed to set up db connection. Entering Mock mode')
- exit(1)
-
- class Recipe(WidgetPlaceholder):
- def __init__(self, activity_manager):
- super().__init__(SolidFill(u'/'))
- self.activity_manager = activity_manager
- recipe = self.activity_manager.get('recipe_editor')
- self.activity_manager.show(self)
- self.activity_manager.show(recipe)
- cur.execute("BEGIN;")
- with open('units.sql') as f:
- to_exec = ''
- quote = False
- conn.add_notice_handler(lambda x: print(f"[{x.severity}] {x.message_primary}"))
- for line in filter(lambda x: x.strip() not in ('BEGIN;','ROLLBACK;'), f):
- if line.startswith("""$$"""):
- quote = True if not quote else False
- to_exec += line
- if not line.strip().endswith(';'):
- continue
- if quote:
- continue
- cur.execute(to_exec)
- try:
- data = pd.DataFrame(cur.fetchall(), columns=[
- x.name for x in cur.description
- ])
- if not data.empty:
- print()
- print(cur._last_query)
- print(data.to_string(index=False))
-
- except psycopg.ProgrammingError:
- pass
- to_exec = ''
-
- input("Press enter to continue")
- activity_manager = ActivityManager()
- query_manager = QueryManager(cur, display_mapper)
- fname = sys.argv[1]
- with open(fname) as f:
- recipe = parse_recipe.parse_recipe(f, query_manager)
- activity_manager.create(RecipeEditor,
- 'recipe_editor',
- activity_manager, query_manager,
- fname, recipe,
- )
- app = Recipe(activity_manager)
- def iter_palettes():
- palettes = [ v for k,v in high_contrast.theme.items() ]
- while True:
- p = palettes.pop(0)
- palettes.append(p)
- yield p
- palettes = iter_palettes()
- screen = raw_display.Screen()
- loop = MainLoop(app, next(palettes), screen=screen,
- unhandled_input=lambda k: show_or_exit(k, screen=screen, palettes=palettes),
- pop_ups=True)
- loop.run()
- cur.execute("ROLLBACK")
- cur.close()
- conn.close()
|