# # Copyright (c) Daniel Sheffield 2023 # # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from xml.etree.ElementTree import fromstring, ParseError from markdown import markdown from itertools import chain, product from decimal import Decimal, InvalidOperation from typing import List, Tuple, Union, Iterable, Callable from urwid import ( connect_signal, AttrMap, Button, Columns, Divider, Edit, Filler, LineBox, Padding, Pile, Text, ) from urwid.numedit import FloatEdit from .. import COPYRIGHT from ..widgets import ( AutoCompleteEdit, AutoCompleteFloatEdit, FocusWidget, AutoCompletePopUp, NoTabCheckBox, FlowBarGraphWithVScale, ) from ..db_utils import QueryManager from . import ActivityManager, show_or_exit from .Rating import Rating import yaml def change_style(style, representer): def new_representer(dumper, data): scalar = representer(dumper, data) scalar.style = style return scalar return new_representer import yaml from yaml.representer import SafeRepresenter class folded_str(str): pass class literal_str(str): pass # represent_str does handle some corner cases, so use that # instead of calling represent_scalar directly represent_folded_str = change_style('>', SafeRepresenter.represent_str) represent_literal_str = change_style('|', SafeRepresenter.represent_str) yaml.add_representer(folded_str, represent_folded_str) yaml.add_representer(literal_str, represent_literal_str) def depth_first_elements(tree): for e in tree: for y in depth_first_elements(e): yield y yield tree def get_products_from_xhtml(md: str): try: xhtml = fromstring( f""" {md} """) except ParseError: return for e in filter(lambda x: x.tag == 'strong', depth_first_elements(xhtml)): yield e.text def to_numbered_field(x): if len(x[0].split('#', 1)) > 1: name, idx = x[0].split('#', 1) idx = int(idx) else: name, idx = x[0], 0 return (name, int(idx)), x[1] def to_unnumbered_field(x): return x[0][0], x[1] def in_same_row(name): if len(name.split('#', 1)) > 1: _, row = name.split('#', 1) return lambda x: x[0][1] == int(row) def unzip(iter: List[Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]]) -> Tuple[ List[AutoCompleteEdit], List[FloatEdit], List[AutoCompleteEdit] ]: return zip(*iter) def extract_values(x: Union[List[AutoCompleteFloatEdit], List[FloatEdit]]) -> Iterable[str]: if isinstance(x, list) or isinstance(x, tuple): if len(x) == 0: return [] return ( v.get_edit_text() for v in x ) raise Exception(f"Unsupported type: {type(x)}") def to_named_value(name: str) -> Callable[[str], Tuple[str,str]]: return lambda e: (f'{name}#{e[0]}', e[1]) def blank_ingredients_row(idx: int) -> Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]: return ( AutoCompleteEdit(('bg', f'product#{idx}')), FloatEdit(('bg', f'')), AutoCompleteEdit(('bg', f'unit#{idx}')) ) class RecipeEditor(FocusWidget): def keypress(self, size, key): if isinstance(key, tuple): return if getattr(self._w.original_widget, 'original_widget', None) is None: return super().keypress(size, key) if key == 'tab': self.advance_focus() elif key == 'shift tab': self.advance_focus(reverse=True) elif key == 'ctrl delete': self.clear() elif key == 'ctrl w': self.save() else: return super().keypress(size, key) def apply_choice(self, name, value): self.apply_changes(name, value) data = dict(filter( in_same_row(name), map(to_numbered_field, self.data.items()) )) for k,v in data.items(): if f'{k[0]}#{k[1]}' == name or v: continue _data = dict(map(lambda x: (x[0][0], x[1]), data.items())) options = self.query_manager.unique_suggestions(k[0], **_data) if len(options) == 1: self.apply_changes(f'{k[0]}#{k[1]}', list(options)[0]) def apply_changes(self, name, value): self.data = { name: value if name != 'organic' else { 'yes': True, 'no': False, True: True, False: False, 'mixed': '', }[value], } @property def data(self): zipped = zip( ['product', 'quantity', 'unit'], map(extract_values, unzip(self.ingredients)), ) ret = dict(chain( *[ map(to_named_value(n), enumerate(l)) for n,l in zipped ], [ ('organic', self.organic.state) ] )) return ret @data.setter def data(self, _data: dict): for k,v in _data.items(): if len(k.split('#')) > 1: name, idx = k.split('#', 1) w = self.ingredients[int(idx)][ next(( pos for pos, n in zip( [0, 1, 2], ['product', 'quantity', 'unit'] ) if n == name ))] w.set_edit_text(v) if k == 'organic': self.organic.set_state(v) @property def components(self): return self._components @components.setter def components(self, _data: dict): self._components = _data def clear(self): self.ingredients = [] self.add_ingredient() self.organic.set_state('mixed') self.instructions.set_edit_text('') self.fname.set_edit_text('') self.notice.set_text('') self.feeds.edit_text = '' return self.update() def init_ingredients(self): left_pane = [ LineBox(AttrMap( AutoCompletePopUp( ingredient[0], self.apply_choice, lambda: self.activity_manager.show(self.update(ingredient[0])) ), 'streak'), title=f'Product', title_align='left' ) for idx, ingredient in enumerate(self.ingredients) ] middle_pane = [ LineBox( ingredient[1], title=f'Quantity', title_align='left' ) for idx, ingredient in enumerate(self.ingredients) ] right_pane = [ LineBox(AttrMap( AutoCompletePopUp( ingredient[2], self.apply_choice, lambda: self.activity_manager.show(self.update(ingredient[2])) ), 'streak'), title=f'Unit', title_align='left' ) for idx, ingredient in enumerate(self.ingredients) ] gutter = [ *[ Divider() for _ in product( range(3), self.ingredients[:-1] )], Divider(), Divider(), self.buttons['add'], ] return left_pane, middle_pane, right_pane, gutter def add_ingredient(self): self.ingredients.append( blank_ingredients_row(len(self.ingredients)) ) l, m, r, gutter = self.init_ingredients() self.components['left_pane'].contents = list(map(lambda x: (x, ('weight',1)), l)) self.components['middle_pane'][1].contents = list(map(lambda x: (x, ('weight',1)), m)) self.components['right_pane'][1].contents = list(map(lambda x: (x, ('weight',1)), r)) self.components['gutter'][1].contents = list(map(lambda x: (x, ('weight',1)), gutter)) for idx, widget in enumerate(self.ingredients): connect_signal(widget[0], 'postchange', lambda w,_: self.update(w)) connect_signal(widget[0], 'apply', lambda w, name: self.autocomplete_callback( w, self.autocomplete_options(name, dict(map( to_unnumbered_field, filter(in_same_row(name), map(to_numbered_field, self.data.items()) )))) )) connect_signal(widget[1], 'postchange', lambda w,_: self.update(w)) connect_signal(widget[2], 'postchange', lambda w,_: self.update(w)) connect_signal(widget[2], 'apply', lambda w, name: self.autocomplete_callback( w, self.autocomplete_options(name, dict(map( to_unnumbered_field, filter(in_same_row(name), map(to_numbered_field, self.data.items()) )))) )) def save(self): yml = dict() yml['ingredients'] = list(map(lambda x: ' '.join(x), filter( lambda x: None not in map(lambda x: x or None, x), [ ( x[0].get_edit_text(), x[1].get_text()[0], x[2].get_edit_text(), ) for x in self.ingredients ]))) serves = self.feeds.value() if serves: n, d = serves.as_integer_ratio() yml['feeds'] = float(self.feeds.value()) if d != 1 else n else: yml['feeds'] = None yml['instructions'] = literal_str('\n'.join(map( lambda x: x.strip(), self.instructions.get_text()[0].splitlines() )).strip()) fname = self.fname.get_edit_text() if not fname: return with open(f'{fname}-modified.yaml', 'w') as f: yaml.dump(yml, f) def update(self, widget = None): data = self.data organic = None if data['organic'] == 'mixed' else data['organic'] sort = 'ts' not_found = '=' for r in filter( lambda x: next(filter(lambda x: x is widget or widget is None, x), None), self.ingredients ): product, quantity, unit = map(lambda x: x.get_edit_text(), r) try: quantity = Decimal(quantity) except InvalidOperation: quantity = None if (product or None, unit or None, quantity or None) == (None, None, None): continue if None in (sort or None, product or None, unit or None, quantity or None): not_found = '>' continue df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic) if df.empty: not_found = '~' if not_found == '=' else not_found df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=None) if df.empty: not_found = '>' continue assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}" _avg, _min, _max = list( map(Decimal,df[['avg','min','max']].iloc[0]) ) self.prices[r[0].get_edit_text()] = [ i*quantity for i in (_min, _avg, _max) ] for k in list(self.prices): if k not in map(lambda x: x[0].get_edit_text(), self.ingredients): del self.prices[k] price = [ sum([self.prices[p][i] for p in self.prices]) for i in range(3) ] self.price.set_text( f'Cost: {not_found}{", ".join([str(p) for p in price])}' ) notice = '' ingredients = list(filter(lambda x: x, map(lambda x: x[0].get_edit_text(), self.ingredients))) parsed_products = list(get_products_from_xhtml(markdown(self.instructions.get_edit_text()))) fname = self.fname.get_edit_text() if not fname: self.notice.set_text('No file name set') return self if not parsed_products: self.notice.set_text('Failed to parse recipe instructions') return self products = set(parsed_products) for product in products - set(ingredients): notice += f"Product '{product}' not found in list of ingredients\n"; for ingredient in set(ingredients) - products: notice += f"Ingredient '{ingredient}' is not used\n"; if len(set(ingredients)) != len(ingredients): notice += f"Some ingredients listed more than once\n" self.notice.set_text(notice or 'None') return self def __init__(self, activity_manager: ActivityManager, query_manager: QueryManager, fname: str, recipe: dict, ): self.fname = Edit('', fname) self.prices = dict() self.components = dict() self.buttons = { 'clear': Button(('streak', 'Clear')), 'exit': Button(('streak', 'Exit')), 'add': Button(('streak', 'Add')), 'save': Button(('streak', 'Save')), } self.ingredients: List[Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]] = [ ( AutoCompleteEdit(('bg', f'product#{idx}'), edit_text=ingredient[0]), FloatEdit(('bg', f''), default=ingredient[1]), AutoCompleteEdit(('bg', f'unit#{idx}'), edit_text=ingredient[2]), ) for idx, ingredient in enumerate(recipe['ingredients']) ] if len(recipe['ingredients']) else [ ] self.organic = NoTabCheckBox(('bg', "Organic"), state='mixed') self.instructions = Edit('', edit_text=recipe['instructions'] or u'', multiline=True, allow_tab=True) self.feeds = FloatEdit(f'Serves: ', f"{recipe['feeds']}" or '') self.price = Text(f"Cost: 0") self.notice = Text('') bottom_pane = [ self.organic, LineBox(self.instructions, title=f'Instructions'), self.feeds, self.price, LineBox(self.notice, title='Errors'), ] self.activity_manager = activity_manager self.query_manager = query_manager self.autocomplete_options = lambda name, data: self.query_manager.unique_suggestions(name.split('#', 1)[0], **data) self.autocomplete_callback = lambda widget, options: len(options) > 0 and widget._emit('open', options) connect_signal(self.organic, 'postchange', lambda *_: self.update()) connect_signal(self.buttons['save'], 'click', lambda _: self.save()) connect_signal(self.buttons['add'], 'click', lambda _: self.add_ingredient()) connect_signal(self.buttons['clear'], 'click', lambda _: self.clear()) connect_signal(self.buttons['exit'], 'click', lambda _: show_or_exit('esc')) connect_signal(self.instructions, 'postchange', lambda w,_: self.update(w)) header = Text(u'Recipe Editor', 'center') _copyright = Text(COPYRIGHT, 'center') banner = Pile([ Padding(header, 'center', width=('relative', 100)), Padding(_copyright, 'center', width=('relative', 100)), ]) banner = AttrMap(banner, 'banner') left_pane, middle_pane, right_pane, gutter = self.init_ingredients() self.components = { 'top_pane': Columns([ (9, Pile([ Divider(), AttrMap(self.buttons['clear'], 'streak'), Divider(), ])), Divider(), LineBox(Columns([ self.fname, (8, self.buttons['save']) ]), title='Recipe'), Divider(), (9, Pile([ Divider(), AttrMap(self.buttons['exit'], 'streak'), Divider(), ])) ], dividechars=1), 'bottom_pane': Pile(bottom_pane), 'right_pane': (15, Pile(right_pane)), 'middle_pane': (12, Pile(middle_pane)), 'left_pane': Pile(left_pane), 'gutter': (8, Pile(gutter)) } self.add_ingredient() widget = Pile([ banner, Divider(), self.components['top_pane'], Columns([ self.components['left_pane'], self.components['middle_pane'], self.components['right_pane'], (1,Divider()), self.components['gutter'], ], dividechars=0), self.components['bottom_pane'], ]) widget = Filler(widget, 'top') widget = AttrMap(widget, 'bg') super().__init__(widget, map( lambda x: next(w for n,w in chain( self.buttons.items(), [ ('fname', self.fname), ('ingredients', self.ingredients[-1][0]), ('instructions', self.instructions), ('quantity', self.ingredients[-1][1]), ('units', self.ingredients[-1][2]), ('organic', self.organic) ], ) if x == n), [ 'instructions', 'ingredients', 'quantity', 'units', 'add', 'organic', 'clear', 'fname', 'save', 'exit', ] )) self.update()