123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- from xml.etree.ElementTree import fromstring, ParseError
- from markdown import markdown
- from itertools import chain, product
- from decimal import Decimal, InvalidOperation
- from typing import List, Tuple, Union, Iterable, Callable
- from urwid import (
- connect_signal,
- AttrMap,
- Button,
- Columns,
- Divider,
- Edit,
- Filler,
- LineBox,
- Padding,
- Pile,
- Text,
- )
- from urwid.numedit import FloatEdit
- from .. import COPYRIGHT
- from ..widgets import (
- AutoCompleteEdit,
- AutoCompleteFloatEdit,
- FocusWidget,
- AutoCompletePopUp,
- NoTabCheckBox,
- FlowBarGraphWithVScale,
- )
- from ..db_utils import QueryManager
- from . import ActivityManager, show_or_exit
- from .Rating import Rating
- import yaml
- def change_style(style, representer):
- def new_representer(dumper, data):
- scalar = representer(dumper, data)
- scalar.style = style
- return scalar
- return new_representer
- import yaml
- from yaml.representer import SafeRepresenter
- class folded_str(str): pass
- class literal_str(str): pass
- represent_folded_str = change_style('>', SafeRepresenter.represent_str)
- represent_literal_str = change_style('|', SafeRepresenter.represent_str)
- yaml.add_representer(folded_str, represent_folded_str)
- yaml.add_representer(literal_str, represent_literal_str)
- def depth_first_elements(tree):
- for e in tree:
- for y in depth_first_elements(e):
- yield y
- yield tree
- def get_products_from_xhtml(md: str):
- try:
- xhtml = fromstring(
- f"""<root>
- {md}
- </root>
- """)
- except ParseError:
- return
- for e in filter(lambda x: x.tag == 'strong', depth_first_elements(xhtml)):
- yield e.text
- def to_numbered_field(x):
- if len(x[0].split('#', 1)) > 1:
- name, idx = x[0].split('#', 1)
- idx = int(idx)
- else:
- name, idx = x[0], 0
- return (name, int(idx)), x[1]
- def to_unnumbered_field(x):
- return x[0][0], x[1]
- def in_same_row(name):
- if len(name.split('#', 1)) > 1:
- _, row = name.split('#', 1)
- return lambda x: x[0][1] == int(row)
- def unzip(iter: List[Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]]) -> Tuple[
- List[AutoCompleteEdit], List[FloatEdit], List[AutoCompleteEdit]
- ]:
- return zip(*iter)
- def extract_values(x: Union[List[AutoCompleteFloatEdit], List[FloatEdit]]) -> Iterable[str]:
- if isinstance(x, list) or isinstance(x, tuple):
- if len(x) == 0:
- return []
- return ( v.get_edit_text() for v in x )
- raise Exception(f"Unsupported type: {type(x)}")
- def to_named_value(name: str) -> Callable[[str], Tuple[str,str]]:
- return lambda e: (f'{name}#{e[0]}', e[1])
- def blank_ingredients_row(idx: int) -> Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]:
- return (
- AutoCompleteEdit(('bg', f'product#{idx}')),
- FloatEdit(('bg', f'')),
- AutoCompleteEdit(('bg', f'unit#{idx}'))
- )
- class RecipeEditor(FocusWidget):
- def keypress(self, size, key):
- if isinstance(key, tuple):
- return
- if getattr(self._w.original_widget, 'original_widget', None) is None:
- return super().keypress(size, key)
- if key == 'tab':
- self.advance_focus()
- elif key == 'shift tab':
- self.advance_focus(reverse=True)
- elif key == 'ctrl delete':
- self.clear()
- elif key == 'ctrl w':
- self.save()
- else:
- return super().keypress(size, key)
- def apply_choice(self, name, value):
- self.apply_changes(name, value)
- data = dict(filter(
- in_same_row(name),
- map(to_numbered_field, self.data.items())
- ))
- for k,v in data.items():
- if f'{k[0]}#{k[1]}' == name or v:
- continue
- _data = dict(map(lambda x: (x[0][0], x[1]), data.items()))
- options = self.query_manager.unique_suggestions(k[0], **_data)
- if len(options) == 1:
- self.apply_changes(f'{k[0]}#{k[1]}', list(options)[0])
- def apply_changes(self, name, value):
- self.data = {
- name: value if name != 'organic' else {
- 'yes': True, 'no': False,
- True: True, False: False,
- 'mixed': '',
- }[value],
- }
- @property
- def data(self):
- zipped = zip(
- ['product', 'quantity', 'unit'],
- map(extract_values, unzip(self.ingredients)),
- )
- ret = dict(chain(
- *[ map(to_named_value(n), enumerate(l)) for n,l in zipped ],
- [ ('organic', self.organic.state) ]
- ))
- return ret
- @data.setter
- def data(self, _data: dict):
- for k,v in _data.items():
- if len(k.split('#')) > 1:
- name, idx = k.split('#', 1)
- w = self.ingredients[int(idx)][ next(( pos for pos, n in zip(
- [0, 1, 2],
- ['product', 'quantity', 'unit']
- ) if n == name ))]
- w.set_edit_text(v)
- if k == 'organic':
- self.organic.set_state(v)
- @property
- def components(self):
- return self._components
- @components.setter
- def components(self, _data: dict):
- self._components = _data
- def clear(self):
- self.ingredients = []
- self.add_ingredient()
- self.organic.set_state('mixed')
- self.instructions.set_edit_text('')
- self.fname.set_edit_text('')
- self.notice.set_text('')
- self.feeds.edit_text = ''
- return self.update()
- def init_ingredients(self):
- left_pane = [
- LineBox(AttrMap(
- AutoCompletePopUp(
- ingredient[0],
- self.apply_choice,
- lambda: self.activity_manager.show(self.update(ingredient[0]))
- ), 'streak'), title=f'Product', title_align='left'
- ) for idx, ingredient in enumerate(self.ingredients)
- ]
- middle_pane = [
- LineBox(
- ingredient[1], title=f'Quantity', title_align='left'
- ) for idx, ingredient in enumerate(self.ingredients)
- ]
- right_pane = [
- LineBox(AttrMap(
- AutoCompletePopUp(
- ingredient[2],
- self.apply_choice,
- lambda: self.activity_manager.show(self.update(ingredient[2]))
- ), 'streak'), title=f'Unit', title_align='left'
- ) for idx, ingredient in enumerate(self.ingredients)
- ]
- gutter = [
- *[ Divider() for _ in product(
- range(3), self.ingredients[:-1]
- )],
- Divider(),
- Divider(),
- self.buttons['add'],
- ]
- return left_pane, middle_pane, right_pane, gutter
- def add_ingredient(self):
- self.ingredients.append(
- blank_ingredients_row(len(self.ingredients))
- )
- l, m, r, gutter = self.init_ingredients()
- self.components['left_pane'].contents = list(map(lambda x: (x, ('weight',1)), l))
- self.components['middle_pane'][1].contents = list(map(lambda x: (x, ('weight',1)), m))
- self.components['right_pane'][1].contents = list(map(lambda x: (x, ('weight',1)), r))
- self.components['gutter'][1].contents = list(map(lambda x: (x, ('weight',1)), gutter))
- for idx, widget in enumerate(self.ingredients):
- connect_signal(widget[0], 'postchange', lambda w,_: self.update(w))
- connect_signal(widget[0], 'apply', lambda w, name: self.autocomplete_callback(
- w, self.autocomplete_options(name, dict(map(
- to_unnumbered_field,
- filter(in_same_row(name), map(to_numbered_field, self.data.items())
- ))))
- ))
- connect_signal(widget[1], 'postchange', lambda w,_: self.update(w))
- connect_signal(widget[2], 'postchange', lambda w,_: self.update(w))
- connect_signal(widget[2], 'apply', lambda w, name: self.autocomplete_callback(
- w, self.autocomplete_options(name, dict(map(
- to_unnumbered_field,
- filter(in_same_row(name), map(to_numbered_field, self.data.items())
- ))))
- ))
- def save(self):
- yml = dict()
- yml['ingredients'] = list(map(lambda x: ' '.join(x), filter(
- lambda x: None not in map(lambda x: x or None, x), [
- (
- x[0].get_edit_text(),
- x[1].get_text()[0],
- x[2].get_edit_text(),
- ) for x in self.ingredients
- ])))
- serves = self.feeds.value()
- if serves:
- n, d = serves.as_integer_ratio()
- yml['feeds'] = float(self.feeds.value()) if d != 1 else n
- else:
- yml['feeds'] = None
- yml['instructions'] = literal_str('\n'.join(map(
- lambda x: x.strip(),
- self.instructions.get_text()[0].splitlines()
- )).strip())
- fname = self.fname.get_edit_text()
- if not fname:
- return
- with open(f'{fname}-modified.yaml', 'w') as f:
- yaml.dump(yml, f)
- def update(self, widget = None):
- data = self.data
- organic = None if data['organic'] == 'mixed' else data['organic']
- sort = 'ts'
- not_found = '='
- for r in filter(
- lambda x: next(filter(lambda x: x is widget or widget is None, x), None),
- self.ingredients
- ):
- product, quantity, unit = map(lambda x: x.get_edit_text(), r)
- try:
- quantity = Decimal(quantity)
- except InvalidOperation:
- quantity = None
- if (product or None, unit or None, quantity or None) == (None, None, None):
- continue
- if None in (sort or None, product or None, unit or None, quantity or None):
- not_found = '>'
- continue
- df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic)
- if df.empty:
- not_found = '~' if not_found == '=' else not_found
- df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=None)
- if df.empty:
- not_found = '>'
- continue
- assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}"
- _avg, _min, _max = list(
- map(Decimal,df[['avg','min','max']].iloc[0])
- )
- self.prices[r[0].get_edit_text()] = [
- i*quantity for i in (_min, _avg, _max)
- ]
- for k in list(self.prices):
- if k not in map(lambda x: x[0].get_edit_text(), self.ingredients):
- del self.prices[k]
- price = [
- sum([self.prices[p][i] for p in self.prices]) for i in range(3)
- ]
- self.price.set_text(
- f'Cost: {not_found}{", ".join([str(p) for p in price])}'
- )
- notice = ''
- ingredients = list(filter(lambda x: x, map(lambda x: x[0].get_edit_text(), self.ingredients)))
- parsed_products = list(get_products_from_xhtml(markdown(self.instructions.get_edit_text())))
- fname = self.fname.get_edit_text()
- if not fname:
- self.notice.set_text('No file name set')
- return self
- if not parsed_products:
- self.notice.set_text('Failed to parse recipe instructions')
- return self
- products = set(parsed_products)
- for product in products - set(ingredients):
- notice += f"Product '{product}' not found in list of ingredients\n";
- for ingredient in set(ingredients) - products:
- notice += f"Ingredient '{ingredient}' is not used\n";
- if len(set(ingredients)) != len(ingredients):
- notice += f"Some ingredients listed more than once\n"
- self.notice.set_text(notice or 'None')
- return self
- def __init__(self,
- activity_manager: ActivityManager,
- query_manager: QueryManager,
- fname: str,
- recipe: dict,
- ):
- self.fname = Edit('', fname)
- self.prices = dict()
- self.components = dict()
- self.buttons = {
- 'clear': Button(('streak', 'Clear')),
- 'exit': Button(('streak', 'Exit')),
- 'add': Button(('streak', 'Add')),
- 'save': Button(('streak', 'Save')),
- }
- self.ingredients: List[Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]] = [
- (
- AutoCompleteEdit(('bg', f'product#{idx}'), edit_text=ingredient[0]),
- FloatEdit(('bg', f''), default=ingredient[1]),
- AutoCompleteEdit(('bg', f'unit#{idx}'), edit_text=ingredient[2]),
- ) for idx, ingredient in enumerate(recipe['ingredients'])
- ] if len(recipe['ingredients']) else [ ]
- self.organic = NoTabCheckBox(('bg', "Organic"), state='mixed')
- self.instructions = Edit('', edit_text=recipe['instructions'] or u'', multiline=True, allow_tab=True)
- self.feeds = FloatEdit(f'Serves: ', f"{recipe['feeds']}" or '')
- self.price = Text(f"Cost: 0")
- self.notice = Text('')
- bottom_pane = [
- self.organic,
- LineBox(self.instructions, title=f'Instructions'),
- self.feeds,
- self.price,
- LineBox(self.notice, title='Errors'),
- ]
- self.activity_manager = activity_manager
- self.query_manager = query_manager
- self.autocomplete_options = lambda name, data: self.query_manager.unique_suggestions(name.split('#', 1)[0], **data)
- self.autocomplete_callback = lambda widget, options: len(options) > 0 and widget._emit('open', options)
- connect_signal(self.organic, 'postchange', lambda *_: self.update())
- connect_signal(self.buttons['save'], 'click', lambda _: self.save())
- connect_signal(self.buttons['add'], 'click', lambda _: self.add_ingredient())
- connect_signal(self.buttons['clear'], 'click', lambda _: self.clear())
- connect_signal(self.buttons['exit'], 'click', lambda _: show_or_exit('esc'))
- connect_signal(self.instructions, 'postchange', lambda w,_: self.update(w))
- header = Text(u'Recipe Editor', 'center')
- _copyright = Text(COPYRIGHT, 'center')
- banner = Pile([
- Padding(header, 'center', width=('relative', 100)),
- Padding(_copyright, 'center', width=('relative', 100)),
- ])
- banner = AttrMap(banner, 'banner')
- left_pane, middle_pane, right_pane, gutter = self.init_ingredients()
- self.components = {
- 'top_pane': Columns([
- (9, Pile([
- Divider(),
- AttrMap(self.buttons['clear'], 'streak'),
- Divider(),
- ])),
- Divider(),
- LineBox(Columns([
- self.fname, (8, self.buttons['save'])
- ]), title='Recipe'),
- Divider(),
- (9, Pile([
- Divider(),
- AttrMap(self.buttons['exit'], 'streak'),
- Divider(),
- ]))
- ], dividechars=1),
- 'bottom_pane': Pile(bottom_pane),
- 'right_pane': (15, Pile(right_pane)),
- 'middle_pane': (12, Pile(middle_pane)),
- 'left_pane': Pile(left_pane),
- 'gutter': (8, Pile(gutter))
- }
- self.add_ingredient()
- widget = Pile([
- banner,
- Divider(),
- self.components['top_pane'],
- Columns([
- self.components['left_pane'],
- self.components['middle_pane'],
- self.components['right_pane'],
- (1,Divider()),
- self.components['gutter'],
- ], dividechars=0),
- self.components['bottom_pane'],
- ])
- widget = Filler(widget, 'top')
- widget = AttrMap(widget, 'bg')
- super().__init__(widget, map(
- lambda x: next(w for n,w in chain(
- self.buttons.items(),
- [
- ('fname', self.fname),
- ('ingredients', self.ingredients[-1][0]),
- ('instructions', self.instructions),
- ('quantity', self.ingredients[-1][1]),
- ('units', self.ingredients[-1][2]),
- ('organic', self.organic)
- ],
- ) if x == n),
- [
- 'instructions',
- 'ingredients', 'quantity', 'units', 'add',
- 'organic',
- 'clear', 'fname', 'save', 'exit',
- ]
- ))
- self.update()
|