|
@@ -4,10 +4,8 @@
|
|
# All rights reserved
|
|
# All rights reserved
|
|
#
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
|
|
# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
|
|
-from decimal import Decimal, InvalidOperation
|
|
|
|
from itertools import chain
|
|
from itertools import chain
|
|
from typing import (
|
|
from typing import (
|
|
- Callable,
|
|
|
|
Union,
|
|
Union,
|
|
Tuple,
|
|
Tuple,
|
|
List,
|
|
List,
|
|
@@ -25,13 +23,18 @@ from urwid import (
|
|
Edit,
|
|
Edit,
|
|
Filler,
|
|
Filler,
|
|
LineBox,
|
|
LineBox,
|
|
- Padding,
|
|
|
|
Pile,
|
|
Pile,
|
|
Text,
|
|
Text,
|
|
)
|
|
)
|
|
|
|
|
|
-from .. import COPYRIGHT
|
|
|
|
|
|
+from ..data.decimal_util import decimal_or_none
|
|
|
|
+from ..data.dataframe_util import get_caption, get_time_range, stats
|
|
from ..data.QueryManager import QueryManager
|
|
from ..data.QueryManager import QueryManager
|
|
|
|
+from .grouped_widget_util import (
|
|
|
|
+ to_numbered_field,
|
|
|
|
+ to_unnumbered_field,
|
|
|
|
+ to_named_value,
|
|
|
|
+)
|
|
from ..widgets import (
|
|
from ..widgets import (
|
|
AutoCompleteEdit,
|
|
AutoCompleteEdit,
|
|
AutoCompleteFloatEdit,
|
|
AutoCompleteFloatEdit,
|
|
@@ -43,31 +46,14 @@ from ..widgets import (
|
|
from . import ActivityManager
|
|
from . import ActivityManager
|
|
from .Rating import Rating
|
|
from .Rating import Rating
|
|
from .NewProduct import NewProduct
|
|
from .NewProduct import NewProduct
|
|
-
|
|
|
|
-def to_numbered_field(x):
|
|
|
|
- if len(x[0].split('#', 1)) > 1:
|
|
|
|
- name, idx = x[0].split('#', 1)
|
|
|
|
- idx = int(idx)
|
|
|
|
- else:
|
|
|
|
- name, idx = x[0], 0
|
|
|
|
-
|
|
|
|
- return (name, int(idx)), x[1]
|
|
|
|
-
|
|
|
|
-def to_unnumbered_field(x):
|
|
|
|
- return x[0][0], x[1]
|
|
|
|
-
|
|
|
|
-def in_same_row(name):
|
|
|
|
- if len(name.split('#', 1)) > 1:
|
|
|
|
- _, row = name.split('#', 1)
|
|
|
|
- else:
|
|
|
|
- row = 0
|
|
|
|
- return lambda x: x[0][1] == int(row)
|
|
|
|
|
|
+from .Banner import banner
|
|
|
|
|
|
def unzip(_iter: List[Tuple[AutoCompleteEdit, Edit]]) -> Tuple[
|
|
def unzip(_iter: List[Tuple[AutoCompleteEdit, Edit]]) -> Tuple[
|
|
List[AutoCompleteEdit], List[Edit]
|
|
List[AutoCompleteEdit], List[Edit]
|
|
]:
|
|
]:
|
|
return zip(*_iter)
|
|
return zip(*_iter)
|
|
|
|
|
|
|
|
+
|
|
def extract_values(x: Union[List[AutoCompleteEdit], List[Edit]]) -> Iterable[str]:
|
|
def extract_values(x: Union[List[AutoCompleteEdit], List[Edit]]) -> Iterable[str]:
|
|
if isinstance(x, (list, tuple)):
|
|
if isinstance(x, (list, tuple)):
|
|
if len(x) == 0:
|
|
if len(x) == 0:
|
|
@@ -75,8 +61,6 @@ def extract_values(x: Union[List[AutoCompleteEdit], List[Edit]]) -> Iterable[str
|
|
return ( v.get_edit_text() for v in x )
|
|
return ( v.get_edit_text() for v in x )
|
|
raise Exception(f"Unsupported type: {type(x)}")
|
|
raise Exception(f"Unsupported type: {type(x)}")
|
|
|
|
|
|
-def to_named_value(name: str) -> Callable[[str], Tuple[str,str]]:
|
|
|
|
- return lambda e: (f'{name}#{e[0]}', e[1])
|
|
|
|
|
|
|
|
def blank_tags_row(idx: int) -> Tuple[AutoCompleteEdit, Edit]:
|
|
def blank_tags_row(idx: int) -> Tuple[AutoCompleteEdit, Edit]:
|
|
return (
|
|
return (
|
|
@@ -110,10 +94,11 @@ class TransactionEditor(FocusWidget):
|
|
|
|
|
|
def apply_choice(self, name, value):
|
|
def apply_choice(self, name, value):
|
|
self.apply_changes(name, value)
|
|
self.apply_changes(name, value)
|
|
- data = dict(#filter(
|
|
|
|
- # in_same_row(name),
|
|
|
|
- map(to_numbered_field, self.data.items())
|
|
|
|
- )#)
|
|
|
|
|
|
+ data = {
|
|
|
|
+ (field, idx): v for field, idx, v in map(
|
|
|
|
+ to_numbered_field, self.data.items()
|
|
|
|
+ )
|
|
|
|
+ }
|
|
for k,v in data.items():
|
|
for k,v in data.items():
|
|
if f'{k[0]}#{k[1]}' == name or v:
|
|
if f'{k[0]}#{k[1]}' == name or v:
|
|
continue
|
|
continue
|
|
@@ -172,16 +157,12 @@ class TransactionEditor(FocusWidget):
|
|
|
|
|
|
|
|
|
|
def init_tags(self):
|
|
def init_tags(self):
|
|
- #_tags = LineBox(Pile([AttrMap(
|
|
|
|
_tags = Pile([AttrMap(
|
|
_tags = Pile([AttrMap(
|
|
AutoCompletePopUp(
|
|
AutoCompletePopUp(
|
|
tag[0],
|
|
tag[0],
|
|
self.apply_choice,
|
|
self.apply_choice,
|
|
lambda: self.activity_manager.show(self.update())
|
|
lambda: self.activity_manager.show(self.update())
|
|
), 'streak') for tag in self._tags])
|
|
), 'streak') for tag in self._tags])
|
|
- # title=f'Tags',
|
|
|
|
- # title_align='left'
|
|
|
|
- #)
|
|
|
|
gutter = Pile([
|
|
gutter = Pile([
|
|
*[ Divider() for _ in self._tags[:-1] ],
|
|
*[ Divider() for _ in self._tags[:-1] ],
|
|
Divider(),
|
|
Divider(),
|
|
@@ -196,7 +177,6 @@ class TransactionEditor(FocusWidget):
|
|
blank_tags_row(len(self._tags))
|
|
blank_tags_row(len(self._tags))
|
|
)
|
|
)
|
|
_tags, gutter = self.init_tags()
|
|
_tags, gutter = self.init_tags()
|
|
- #self.components['tags'][1].original_widget.contents = list(_tags.original_widget.contents)
|
|
|
|
self.components['tags'].contents = list(_tags.contents)
|
|
self.components['tags'].contents = list(_tags.contents)
|
|
self.components['gutter'][1].contents = list(gutter.contents)
|
|
self.components['gutter'][1].contents = list(gutter.contents)
|
|
for widget in self._tags:
|
|
for widget in self._tags:
|
|
@@ -204,16 +184,13 @@ class TransactionEditor(FocusWidget):
|
|
connect_signal(widget[0], 'apply', lambda w, name: self.autocomplete_callback(
|
|
connect_signal(widget[0], 'apply', lambda w, name: self.autocomplete_callback(
|
|
w, name, self.autocomplete_options(name, dict(map(
|
|
w, name, self.autocomplete_options(name, dict(map(
|
|
to_unnumbered_field,
|
|
to_unnumbered_field,
|
|
- #filter(
|
|
|
|
- # in_same_row(name),
|
|
|
|
- map(to_numbered_field, self.data.items()
|
|
|
|
- #)
|
|
|
|
|
|
+ map(to_numbered_field, self.data.items()
|
|
))))
|
|
))))
|
|
))
|
|
))
|
|
|
|
|
|
|
|
|
|
def clear(self):
|
|
def clear(self):
|
|
- self._tags = []
|
|
|
|
|
|
+ self._tags: List[Tuple[AutoCompleteEdit, Edit]] = []
|
|
self.add_tag()
|
|
self.add_tag()
|
|
for (k, ef) in self.edit_fields.items():
|
|
for (k, ef) in self.edit_fields.items():
|
|
if k in ('ts', 'store',):
|
|
if k in ('ts', 'store',):
|
|
@@ -249,46 +226,23 @@ class TransactionEditor(FocusWidget):
|
|
).truncate(
|
|
).truncate(
|
|
before=max(0, len(df.index)-self.graph._canvas_width)
|
|
before=max(0, len(df.index)-self.graph._canvas_width)
|
|
)
|
|
)
|
|
- data = df[['$/unit','quantity']].apply(
|
|
|
|
- lambda x: (float(x['$/unit']), float(x['quantity'])),
|
|
|
|
|
|
+ data = df[['price', '$/unit','quantity']].apply(
|
|
|
|
+ lambda x: (float(x['price']), float(x['$/unit']), float(x['quantity'])),
|
|
axis=1, result_type='broadcast'
|
|
axis=1, result_type='broadcast'
|
|
)
|
|
)
|
|
- data['avg'] = (data['$/unit']*data['quantity']).sum()/data['quantity'].sum()
|
|
|
|
- data_max = data.max()['$/unit'] #.max()
|
|
|
|
- assert len(data['avg'].unique()) == 1
|
|
|
|
norm = [ (x,) for x in data['$/unit'] ] #.to_records(index=False)
|
|
norm = [ (x,) for x in data['$/unit'] ] #.to_records(index=False)
|
|
- self.graph.set_data(norm, data_max,
|
|
|
|
- vscale=list(map(float, [
|
|
|
|
- data['$/unit'].min(),
|
|
|
|
- data['$/unit'].median(),
|
|
|
|
- data['avg'].iloc[0],
|
|
|
|
- data_max
|
|
|
|
- ]))
|
|
|
|
- )
|
|
|
|
- #self.graph.set_bar_width(1)
|
|
|
|
- # canvas_width = 10 + pad + pad + 10
|
|
|
|
- date_strlen = (self.graph.canvas_width - 20)
|
|
|
|
- ex = "─" if date_strlen % 2 else ""
|
|
|
|
- plen = date_strlen//2
|
|
|
|
- caption = f"{df['ts_raw'].min():%d/%m/%Y}"
|
|
|
|
- caption += f"{{p:>{plen}}}{ex}{{p:<{plen}}}".format(
|
|
|
|
- p="─")
|
|
|
|
- caption += f"{df['ts_raw'].max():%d/%m/%Y}"
|
|
|
|
|
|
+ scale = stats(data, 'price', 'quantity', '$/unit')
|
|
|
|
+ self.graph.set_data(norm, scale[-1], vscale=scale)
|
|
|
|
+ time_range = get_time_range(df, 'ts_raw')
|
|
|
|
+ caption = get_caption(time_range, self.graph.canvas_width)
|
|
self.graph.set_caption(caption)
|
|
self.graph.set_caption(caption)
|
|
|
|
|
|
def update_historic_prices(self, data):
|
|
def update_historic_prices(self, data):
|
|
organic = None if data['organic'] == 'mixed' else data['organic']
|
|
organic = None if data['organic'] == 'mixed' else data['organic']
|
|
#sort = '$/unit' if self.buttons['sort_price'].state else 'ts'
|
|
#sort = '$/unit' if self.buttons['sort_price'].state else 'ts'
|
|
product, unit = data['product'] or None, data['unit'] or None
|
|
product, unit = data['product'] or None, data['unit'] or None
|
|
- try:
|
|
|
|
- price = Decimal(data['price'])
|
|
|
|
- except InvalidOperation:
|
|
|
|
- price = None
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- quantity = Decimal(data['quantity'])
|
|
|
|
- except InvalidOperation:
|
|
|
|
- quantity = None
|
|
|
|
|
|
+ price = decimal_or_none(data['price'])
|
|
|
|
+ quantity = decimal_or_none(data['quantity'])
|
|
|
|
|
|
if None in (product, unit):
|
|
if None in (product, unit):
|
|
self.rating.update_rating(None, None, None, unit)
|
|
self.rating.update_rating(None, None, None, unit)
|
|
@@ -341,7 +295,7 @@ class TransactionEditor(FocusWidget):
|
|
query_manager: QueryManager,
|
|
query_manager: QueryManager,
|
|
):
|
|
):
|
|
self.autocomplete_options = lambda name, data: self.query_manager.unique_suggestions(name.split('#', 1)[0], **data)
|
|
self.autocomplete_options = lambda name, data: self.query_manager.unique_suggestions(name.split('#', 1)[0], **data)
|
|
- self.activity_manager = activity_manager
|
|
|
|
|
|
+ self.activity_manager: ActivityManager = activity_manager
|
|
self.query_manager = query_manager
|
|
self.query_manager = query_manager
|
|
self.buttons = {
|
|
self.buttons = {
|
|
'done': Button(('streak', u'Done')),
|
|
'done': Button(('streak', u'Done')),
|
|
@@ -385,26 +339,7 @@ class TransactionEditor(FocusWidget):
|
|
)))
|
|
)))
|
|
self.organic_checkbox = self.checkboxes['organic']
|
|
self.organic_checkbox = self.checkboxes['organic']
|
|
connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update())
|
|
connect_signal(self.organic_checkbox, 'postchange', lambda _,v: self.update())
|
|
- layout = [
|
|
|
|
- [ 'ts', 'store', ],
|
|
|
|
- [ 'organic', 'product', ],
|
|
|
|
- [ 'category', 'group', ],
|
|
|
|
- ]
|
|
|
|
- side_pane = [
|
|
|
|
- 'unit',
|
|
|
|
- 'quantity',
|
|
|
|
- 'price',
|
|
|
|
- ]
|
|
|
|
- bottom_pane = [
|
|
|
|
- 'description',
|
|
|
|
- 'dbview',
|
|
|
|
- ]
|
|
|
|
- badge = [
|
|
|
|
- 'rating',
|
|
|
|
- 'spread',
|
|
|
|
- 'marker',
|
|
|
|
- ]
|
|
|
|
- #self.clear()
|
|
|
|
|
|
+
|
|
_widgets = dict(chain(*list(map(lambda x: x.items(), [
|
|
_widgets = dict(chain(*list(map(lambda x: x.items(), [
|
|
self.edit_fields, self.text_fields, self.checkboxes
|
|
self.edit_fields, self.text_fields, self.checkboxes
|
|
])
|
|
])
|
|
@@ -421,10 +356,7 @@ class TransactionEditor(FocusWidget):
|
|
connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback(
|
|
connect_signal(ef, 'apply', lambda w, name: self.autocomplete_callback(
|
|
w, name, self.autocomplete_options(name, dict(map(
|
|
w, name, self.autocomplete_options(name, dict(map(
|
|
to_unnumbered_field,
|
|
to_unnumbered_field,
|
|
- #filter(
|
|
|
|
- # in_same_row(name),
|
|
|
|
map(to_numbered_field, self.data.items()
|
|
map(to_numbered_field, self.data.items()
|
|
- #)
|
|
|
|
))))
|
|
))))
|
|
))
|
|
))
|
|
|
|
|
|
@@ -438,24 +370,20 @@ class TransactionEditor(FocusWidget):
|
|
title=k.title(), title_align='left'
|
|
title=k.title(), title_align='left'
|
|
) for k in self.edit_fields if k != 'product'
|
|
) for k in self.edit_fields if k != 'product'
|
|
})
|
|
})
|
|
- header = Text(u'Fill Transaction', 'center')
|
|
|
|
- _copyright = Text(COPYRIGHT, 'center')
|
|
|
|
|
|
|
|
self.components.update({
|
|
self.components.update({
|
|
'bottom_button_bar': Columns(
|
|
'bottom_button_bar': Columns(
|
|
[(8, self.buttons['done']), Divider(), (9, self.buttons['clear'])]
|
|
[(8, self.buttons['done']), Divider(), (9, self.buttons['clear'])]
|
|
),
|
|
),
|
|
- 'badge': Pile(map(
|
|
|
|
- lambda x: _widgets[x] if x is not None else Divider,
|
|
|
|
- badge
|
|
|
|
- )),
|
|
|
|
|
|
+ 'badge': Pile([
|
|
|
|
+ _widgets[x] for x in ['rating', 'spread', 'marker']
|
|
|
|
+ ]),
|
|
})
|
|
})
|
|
self.components.update({
|
|
self.components.update({
|
|
'bottom_pane': Columns([
|
|
'bottom_pane': Columns([
|
|
- Pile(map(
|
|
|
|
- lambda x: _widgets[x] if x is not None else Divider(),
|
|
|
|
- bottom_pane
|
|
|
|
- )),
|
|
|
|
|
|
+ Pile([
|
|
|
|
+ _widgets[x] for x in ['description', 'dbview']
|
|
|
|
+ ]),
|
|
(self.graph.total_width+2, Pile([
|
|
(self.graph.total_width+2, Pile([
|
|
LineBox(
|
|
LineBox(
|
|
AttrMap(self.components['badge'], 'badge'),
|
|
AttrMap(self.components['badge'], 'badge'),
|
|
@@ -472,11 +400,6 @@ class TransactionEditor(FocusWidget):
|
|
connect_signal(self.buttons['clear'], 'click', lambda _: self.clear())
|
|
connect_signal(self.buttons['clear'], 'click', lambda _: self.clear())
|
|
connect_signal(self.buttons['add'], 'click', lambda _: self.add_tag())
|
|
connect_signal(self.buttons['add'], 'click', lambda _: self.add_tag())
|
|
|
|
|
|
- banner = Pile([
|
|
|
|
- Padding(header, 'center', width=('relative', 100)),
|
|
|
|
- Padding(_copyright, 'center', width=('relative', 100)),
|
|
|
|
- ])
|
|
|
|
- banner = AttrMap(banner, 'banner')
|
|
|
|
_widgets.update({
|
|
_widgets.update({
|
|
'product': LineBox(Columns([
|
|
'product': LineBox(Columns([
|
|
AttrMap(AutoCompletePopUp(
|
|
AttrMap(AutoCompletePopUp(
|
|
@@ -489,25 +412,23 @@ class TransactionEditor(FocusWidget):
|
|
})
|
|
})
|
|
|
|
|
|
self.components['side_pane'] = (12, Pile([
|
|
self.components['side_pane'] = (12, Pile([
|
|
- _widgets[r] if r is not None else Divider() for r in side_pane
|
|
|
|
|
|
+ _widgets[r] for r in ['unit', 'quantity', 'price']
|
|
]))
|
|
]))
|
|
- self.components['main_pane'] = []
|
|
|
|
- for _, r in enumerate(layout):
|
|
|
|
- col = []
|
|
|
|
- for c in r:
|
|
|
|
- if c is not None:
|
|
|
|
- if c == 'organic':
|
|
|
|
- continue
|
|
|
|
- col.append(_widgets[c])
|
|
|
|
- else:
|
|
|
|
- col.append(Divider())
|
|
|
|
- self.components['main_pane'].append(Columns(col))
|
|
|
|
-
|
|
|
|
- self.components['main_pane'] = Pile(self.components['main_pane'])
|
|
|
|
|
|
+
|
|
|
|
+ self.components['main_pane'] = Pile([
|
|
|
|
+ Columns([
|
|
|
|
+ _widgets[c] for c in ['ts', 'store']
|
|
|
|
+ ]),
|
|
|
|
+ _widgets['product'],
|
|
|
|
+ Columns([
|
|
|
|
+ _widgets[c] for c in ['category', 'group']
|
|
|
|
+ ])
|
|
|
|
+ ])
|
|
|
|
+
|
|
self.add_tag()
|
|
self.add_tag()
|
|
|
|
|
|
widget = Pile([
|
|
widget = Pile([
|
|
- banner,
|
|
|
|
|
|
+ banner(u'Fill Transaction'),
|
|
Divider(),
|
|
Divider(),
|
|
Columns([
|
|
Columns([
|
|
self.components['main_pane'],
|
|
self.components['main_pane'],
|