RecipeEditor.py 17 KB


  1. #
  2. # Copyright (c) Daniel Sheffield 2023
  3. #
  4. # All rights reserved
  5. #
  6. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  7. from xml.etree.ElementTree import fromstring, ParseError
  8. from markdown import markdown
  9. from itertools import chain, product
  10. from decimal import Decimal, InvalidOperation
  11. from typing import List, Tuple, Union, Iterable, Callable
  12. from urwid import (
  13. connect_signal,
  14. AttrMap,
  15. Button,
  16. Columns,
  17. Divider,
  18. Edit,
  19. Filler,
  20. LineBox,
  21. Padding,
  22. Pile,
  23. Text,
  24. )
  25. from urwid.numedit import FloatEdit
  26. from .. import COPYRIGHT
  27. from ..widgets import (
  28. AutoCompleteEdit,
  29. AutoCompleteFloatEdit,
  30. FocusWidget,
  31. AutoCompletePopUp,
  32. NoTabCheckBox,
  33. FlowBarGraphWithVScale,
  34. )
  35. from ..db_utils import QueryManager
  36. from . import ActivityManager, show_or_exit
  37. from .Rating import Rating
  38. import yaml
  39. def change_style(style, representer):
  40. def new_representer(dumper, data):
  41. scalar = representer(dumper, data)
  42. scalar.style = style
  43. return scalar
  44. return new_representer
  45. import yaml
  46. from yaml.representer import SafeRepresenter
  47. class folded_str(str): pass
  48. class literal_str(str): pass
  49. # represent_str does handle some corner cases, so use that
  50. # instead of calling represent_scalar directly
  51. represent_folded_str = change_style('>', SafeRepresenter.represent_str)
  52. represent_literal_str = change_style('|', SafeRepresenter.represent_str)
  53. yaml.add_representer(folded_str, represent_folded_str)
  54. yaml.add_representer(literal_str, represent_literal_str)
  55. def depth_first_elements(tree):
  56. for e in tree:
  57. for y in depth_first_elements(e):
  58. yield y
  59. yield tree
  60. def get_products_from_xhtml(md: str):
  61. try:
  62. xhtml = fromstring(
  63. f"""<root>
  64. {md}
  65. </root>
  66. """)
  67. except ParseError:
  68. return
  69. for e in filter(lambda x: x.tag == 'strong', depth_first_elements(xhtml)):
  70. yield e.text
  71. def to_numbered_field(x):
  72. if len(x[0].split('#', 1)) > 1:
  73. name, idx = x[0].split('#', 1)
  74. idx = int(idx)
  75. else:
  76. name, idx = x[0], 0
  77. return (name, int(idx)), x[1]
  78. def to_unnumbered_field(x):
  79. return x[0][0], x[1]
  80. def in_same_row(name):
  81. if len(name.split('#', 1)) > 1:
  82. _, row = name.split('#', 1)
  83. return lambda x: x[0][1] == int(row)
  84. def unzip(iter: List[Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]]) -> Tuple[
  85. List[AutoCompleteEdit], List[FloatEdit], List[AutoCompleteEdit]
  86. ]:
  87. return zip(*iter)
  88. def extract_values(x: Union[List[AutoCompleteFloatEdit], List[FloatEdit]]) -> Iterable[str]:
  89. if isinstance(x, list) or isinstance(x, tuple):
  90. if len(x) == 0:
  91. return []
  92. return ( v.get_edit_text() for v in x )
  93. raise Exception(f"Unsupported type: {type(x)}")
  94. def to_named_value(name: str) -> Callable[[str], Tuple[str,str]]:
  95. return lambda e: (f'{name}#{e[0]}', e[1])
  96. def blank_ingredients_row(idx: int) -> Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]:
  97. return (
  98. AutoCompleteEdit(('bg', f'product#{idx}')),
  99. FloatEdit(('bg', f'')),
  100. AutoCompleteEdit(('bg', f'unit#{idx}'))
  101. )
  102. class RecipeEditor(FocusWidget):
  103. def keypress(self, size, key):
  104. if isinstance(key, tuple):
  105. return
  106. if getattr(self._w.original_widget, 'original_widget', None) is None:
  107. return super().keypress(size, key)
  108. if key == 'tab':
  109. self.advance_focus()
  110. elif key == 'shift tab':
  111. self.advance_focus(reverse=True)
  112. elif key == 'ctrl delete':
  113. self.clear()
  114. elif key == 'ctrl w':
  115. self.save()
  116. else:
  117. return super().keypress(size, key)
  118. def apply_choice(self, name, value):
  119. self.apply_changes(name, value)
  120. data = dict(filter(
  121. in_same_row(name),
  122. map(to_numbered_field, self.data.items())
  123. ))
  124. for k,v in data.items():
  125. if f'{k[0]}#{k[1]}' == name or v:
  126. continue
  127. _data = dict(map(lambda x: (x[0][0], x[1]), data.items()))
  128. options = self.query_manager.unique_suggestions(k[0], **_data)
  129. if len(options) == 1:
  130. self.apply_changes(f'{k[0]}#{k[1]}', list(options)[0])
  131. def apply_changes(self, name, value):
  132. self.data = {
  133. name: value if name != 'organic' else {
  134. 'yes': True, 'no': False,
  135. True: True, False: False,
  136. 'mixed': '',
  137. }[value],
  138. }
  139. @property
  140. def data(self):
  141. zipped = zip(
  142. ['product', 'quantity', 'unit'],
  143. map(extract_values, unzip(self.ingredients)),
  144. )
  145. ret = dict(chain(
  146. *[ map(to_named_value(n), enumerate(l)) for n,l in zipped ],
  147. [ ('organic', self.organic.state) ]
  148. ))
  149. return ret
  150. @data.setter
  151. def data(self, _data: dict):
  152. for k,v in _data.items():
  153. if len(k.split('#')) > 1:
  154. name, idx = k.split('#', 1)
  155. w = self.ingredients[int(idx)][ next(( pos for pos, n in zip(
  156. [0, 1, 2],
  157. ['product', 'quantity', 'unit']
  158. ) if n == name ))]
  159. w.set_edit_text(v)
  160. if k == 'organic':
  161. self.organic.set_state(v)
  162. @property
  163. def components(self):
  164. return self._components
  165. @components.setter
  166. def components(self, _data: dict):
  167. self._components = _data
  168. def clear(self):
  169. self.ingredients = []
  170. self.add_ingredient()
  171. self.organic.set_state('mixed')
  172. self.instructions.set_edit_text('')
  173. self.fname.set_edit_text('')
  174. self.notice.set_text('')
  175. self.feeds.edit_text = ''
  176. return self.update()
  177. def init_ingredients(self):
  178. left_pane = [
  179. LineBox(AttrMap(
  180. AutoCompletePopUp(
  181. ingredient[0],
  182. self.apply_choice,
  183. lambda: self.activity_manager.show(self.update(ingredient[0]))
  184. ), 'streak'), title=f'Product', title_align='left'
  185. ) for idx, ingredient in enumerate(self.ingredients)
  186. ]
  187. middle_pane = [
  188. LineBox(
  189. ingredient[1], title=f'Quantity', title_align='left'
  190. ) for idx, ingredient in enumerate(self.ingredients)
  191. ]
  192. right_pane = [
  193. LineBox(AttrMap(
  194. AutoCompletePopUp(
  195. ingredient[2],
  196. self.apply_choice,
  197. lambda: self.activity_manager.show(self.update(ingredient[2]))
  198. ), 'streak'), title=f'Unit', title_align='left'
  199. ) for idx, ingredient in enumerate(self.ingredients)
  200. ]
  201. gutter = [
  202. *[ Divider() for _ in product(
  203. range(3), self.ingredients[:-1]
  204. )],
  205. Divider(),
  206. Divider(),
  207. self.buttons['add'],
  208. ]
  209. return left_pane, middle_pane, right_pane, gutter
  210. def add_ingredient(self):
  211. self.ingredients.append(
  212. blank_ingredients_row(len(self.ingredients))
  213. )
  214. l, m, r, gutter = self.init_ingredients()
  215. self.components['left_pane'].contents = list(map(lambda x: (x, ('weight',1)), l))
  216. self.components['middle_pane'][1].contents = list(map(lambda x: (x, ('weight',1)), m))
  217. self.components['right_pane'][1].contents = list(map(lambda x: (x, ('weight',1)), r))
  218. self.components['gutter'][1].contents = list(map(lambda x: (x, ('weight',1)), gutter))
  219. for idx, widget in enumerate(self.ingredients):
  220. connect_signal(widget[0], 'postchange', lambda w,_: self.update(w))
  221. connect_signal(widget[0], 'apply', lambda w, name: self.autocomplete_callback(
  222. w, self.autocomplete_options(name, dict(map(
  223. to_unnumbered_field,
  224. filter(in_same_row(name), map(to_numbered_field, self.data.items())
  225. ))))
  226. ))
  227. connect_signal(widget[1], 'postchange', lambda w,_: self.update(w))
  228. connect_signal(widget[2], 'postchange', lambda w,_: self.update(w))
  229. connect_signal(widget[2], 'apply', lambda w, name: self.autocomplete_callback(
  230. w, self.autocomplete_options(name, dict(map(
  231. to_unnumbered_field,
  232. filter(in_same_row(name), map(to_numbered_field, self.data.items())
  233. ))))
  234. ))
  235. def save(self):
  236. yml = dict()
  237. yml['ingredients'] = list(map(lambda x: ' '.join(x), filter(
  238. lambda x: None not in map(lambda x: x or None, x), [
  239. (
  240. x[0].get_edit_text(),
  241. x[1].get_text()[0],
  242. x[2].get_edit_text(),
  243. ) for x in self.ingredients
  244. ])))
  245. serves = self.feeds.value()
  246. if serves:
  247. n, d = serves.as_integer_ratio()
  248. yml['feeds'] = float(self.feeds.value()) if d != 1 else n
  249. else:
  250. yml['feeds'] = None
  251. yml['instructions'] = literal_str('\n'.join(map(
  252. lambda x: x.strip(),
  253. self.instructions.get_text()[0].splitlines()
  254. )).strip())
  255. fname = self.fname.get_edit_text()
  256. if not fname:
  257. return
  258. with open(f'{fname}-modified.yaml', 'w') as f:
  259. yaml.dump(yml, f)
  260. def update(self, widget = None):
  261. data = self.data
  262. organic = None if data['organic'] == 'mixed' else data['organic']
  263. sort = 'ts'
  264. not_found = '='
  265. for r in filter(
  266. lambda x: next(filter(lambda x: x is widget or widget is None, x), None),
  267. self.ingredients
  268. ):
  269. product, quantity, unit = map(lambda x: x.get_edit_text(), r)
  270. try:
  271. quantity = Decimal(quantity)
  272. except InvalidOperation:
  273. quantity = None
  274. if (product or None, unit or None, quantity or None) == (None, None, None):
  275. continue
  276. if None in (sort or None, product or None, unit or None, quantity or None):
  277. not_found = '>'
  278. continue
  279. df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=organic)
  280. if df.empty:
  281. not_found = '~' if not_found == '=' else not_found
  282. df = self.query_manager.get_historic_prices_data(unit, sort=sort, product=product, organic=None)
  283. if df.empty:
  284. not_found = '>'
  285. continue
  286. assert len(df['avg'].unique()) == 1, f"There should be only one average price: {df['avg'].unique()}"
  287. _avg, _min, _max = list(
  288. map(Decimal,df[['avg','min','max']].iloc[0])
  289. )
  290. self.prices[r[0].get_edit_text()] = [
  291. i*quantity for i in (_min, _avg, _max)
  292. ]
  293. for k in list(self.prices):
  294. if k not in map(lambda x: x[0].get_edit_text(), self.ingredients):
  295. del self.prices[k]
  296. price = [
  297. sum([self.prices[p][i] for p in self.prices]) for i in range(3)
  298. ]
  299. self.price.set_text(
  300. f'Cost: {not_found}{", ".join([str(p) for p in price])}'
  301. )
  302. notice = ''
  303. ingredients = list(filter(lambda x: x, map(lambda x: x[0].get_edit_text(), self.ingredients)))
  304. parsed_products = list(get_products_from_xhtml(markdown(self.instructions.get_edit_text())))
  305. fname = self.fname.get_edit_text()
  306. if not fname:
  307. self.notice.set_text('No file name set')
  308. return self
  309. if not parsed_products:
  310. self.notice.set_text('Failed to parse recipe instructions')
  311. return self
  312. products = set(parsed_products)
  313. for product in products - set(ingredients):
  314. notice += f"Product '{product}' not found in list of ingredients\n";
  315. for ingredient in set(ingredients) - products:
  316. notice += f"Ingredient '{ingredient}' is not used\n";
  317. if len(set(ingredients)) != len(ingredients):
  318. notice += f"Some ingredients listed more than once\n"
  319. self.notice.set_text(notice or 'None')
  320. return self
  321. def __init__(self,
  322. activity_manager: ActivityManager,
  323. query_manager: QueryManager,
  324. fname: str,
  325. recipe: dict,
  326. ):
  327. self.fname = Edit('', fname)
  328. self.prices = dict()
  329. self.components = dict()
  330. self.buttons = {
  331. 'clear': Button(('streak', 'Clear')),
  332. 'exit': Button(('streak', 'Exit')),
  333. 'add': Button(('streak', 'Add')),
  334. 'save': Button(('streak', 'Save')),
  335. }
  336. self.ingredients: List[Tuple[AutoCompleteEdit, FloatEdit, AutoCompleteEdit]] = [
  337. (
  338. AutoCompleteEdit(('bg', f'product#{idx}'), edit_text=ingredient[0]),
  339. FloatEdit(('bg', f''), default=ingredient[1]),
  340. AutoCompleteEdit(('bg', f'unit#{idx}'), edit_text=ingredient[2]),
  341. ) for idx, ingredient in enumerate(recipe['ingredients'])
  342. ] if len(recipe['ingredients']) else [ ]
  343. self.organic = NoTabCheckBox(('bg', "Organic"), state='mixed')
  344. self.instructions = Edit('', edit_text=recipe['instructions'] or u'', multiline=True, allow_tab=True)
  345. self.feeds = FloatEdit(f'Serves: ', f"{recipe['feeds']}" or '')
  346. self.price = Text(f"Cost: 0")
  347. self.notice = Text('')
  348. bottom_pane = [
  349. self.organic,
  350. LineBox(self.instructions, title=f'Instructions'),
  351. self.feeds,
  352. self.price,
  353. LineBox(self.notice, title='Errors'),
  354. ]
  355. self.activity_manager = activity_manager
  356. self.query_manager = query_manager
  357. self.autocomplete_options = lambda name, data: self.query_manager.unique_suggestions(name.split('#', 1)[0], **data)
  358. self.autocomplete_callback = lambda widget, options: len(options) > 0 and widget._emit('open', options)
  359. connect_signal(self.organic, 'postchange', lambda *_: self.update())
  360. connect_signal(self.buttons['save'], 'click', lambda _: self.save())
  361. connect_signal(self.buttons['add'], 'click', lambda _: self.add_ingredient())
  362. connect_signal(self.buttons['clear'], 'click', lambda _: self.clear())
  363. connect_signal(self.buttons['exit'], 'click', lambda _: show_or_exit('esc'))
  364. connect_signal(self.instructions, 'postchange', lambda w,_: self.update(w))
  365. header = Text(u'Recipe Editor', 'center')
  366. _copyright = Text(COPYRIGHT, 'center')
  367. banner = Pile([
  368. Padding(header, 'center', width=('relative', 100)),
  369. Padding(_copyright, 'center', width=('relative', 100)),
  370. ])
  371. banner = AttrMap(banner, 'banner')
  372. left_pane, middle_pane, right_pane, gutter = self.init_ingredients()
  373. self.components = {
  374. 'top_pane': Columns([
  375. (9, Pile([
  376. Divider(),
  377. AttrMap(self.buttons['clear'], 'streak'),
  378. Divider(),
  379. ])),
  380. Divider(),
  381. LineBox(Columns([
  382. self.fname, (8, self.buttons['save'])
  383. ]), title='Recipe'),
  384. Divider(),
  385. (9, Pile([
  386. Divider(),
  387. AttrMap(self.buttons['exit'], 'streak'),
  388. Divider(),
  389. ]))
  390. ], dividechars=1),
  391. 'bottom_pane': Pile(bottom_pane),
  392. 'right_pane': (15, Pile(right_pane)),
  393. 'middle_pane': (12, Pile(middle_pane)),
  394. 'left_pane': Pile(left_pane),
  395. 'gutter': (8, Pile(gutter))
  396. }
  397. self.add_ingredient()
  398. widget = Pile([
  399. banner,
  400. Divider(),
  401. self.components['top_pane'],
  402. Columns([
  403. self.components['left_pane'],
  404. self.components['middle_pane'],
  405. self.components['right_pane'],
  406. (1,Divider()),
  407. self.components['gutter'],
  408. ], dividechars=0),
  409. self.components['bottom_pane'],
  410. ])
  411. widget = Filler(widget, 'top')
  412. widget = AttrMap(widget, 'bg')
  413. super().__init__(widget, map(
  414. lambda x: next(w for n,w in chain(
  415. self.buttons.items(),
  416. [
  417. ('fname', self.fname),
  418. ('ingredients', self.ingredients[-1][0]),
  419. ('instructions', self.instructions),
  420. ('quantity', self.ingredients[-1][1]),
  421. ('units', self.ingredients[-1][2]),
  422. ('organic', self.organic)
  423. ],
  424. ) if x == n),
  425. [
  426. 'instructions',
  427. 'ingredients', 'quantity', 'units', 'add',
  428. 'organic',
  429. 'clear', 'fname', 'save', 'exit',
  430. ]
  431. ))
  432. self.update()