# # Copyright (c) Daniel Sheffield 2023 # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY from io import BufferedReader import os from threading import Thread from typing import Union from bottle import ( route, request, response, redirect, abort, template, static_file, FormsDict, HTTPError, ) from psycopg import Cursor, connect from psycopg.rows import TupleRow from .hash_util import blake, bytes_to_base32, hash_to_base32, hex_to_hash, normalize_base32 from .route_decorators import normalize, normalize_query, poison, cursor from .query_to_xml import get_categories, get_groups, get_products, get_tags from .CachedLoadingPage import CachedLoadingPage from .Cache import Cache from . import BOOLEAN, PARAMS, trend as worker host = f"host={os.getenv('HOST')}" db = f"dbname={os.getenv('DB', 'grocery')}" user = f"user={os.getenv('USER', 'das')}" password = f"password={os.getenv('PASSWORD','')}" if not password.split('=',1)[1]: password = '' conn = connect(f"{host} {db} {user} {password}") @route('/grocery/static/') def send_static(filename): return static_file(filename, root='app/rest/static') @route('/grocery/trend', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize def trend(key: str, forms: FormsDict, cache: Cache): page = cache[key] if page: return page _, _, path, *_ = request.urlparts return cache.add(key, CachedLoadingPage( template("loading", progress=[]), lambda queue: Thread(target=worker.trend, args=( queue, conn, path, forms )).start() )) @route('/grocery/groups', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def groups(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_groups(cur, forms) @route('/grocery/categories', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def categories(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_categories(cur, forms) @route('/grocery/products', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def products(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_products(cur, forms) @route('/grocery/tags', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def tags(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_tags(cur, forms) CLIP_SIZE_LIMIT = 65535 SCHEME = "http://" #"https://" HOST = "" DOMAIN = "0.0.0.0" #"shandan.one" PORT = 6772 #"" LOCATION = SCHEME + (f"{HOST}." if HOST else "") + DOMAIN + (f":{PORT}" if PORT else "") @route('/clip', method=['GET', 'POST']) @route('/clip/', method=['GET', 'POST']) def clip(): if request.method == 'GET': _hash = request.params.hash if _hash: _hash = normalize_base32(_hash) content = validate(_hash).decode('utf-8') else: content = None link = f'{LOCATION}/clip/{_hash}' if content else f'{LOCATION}/clip' response.content_type = 'text/html; charset=utf-8' form = template( 'clip-form', action='/clip', method='post', content=content, disabled=True if content else False ) return template( 'paste', form=form, link=link, disabled=True if content else False, download=f'/clip/{_hash}' if content else None ) if request.method == 'POST': if 'paste' not in request.params: return abort(400, "Missing parameter: 'paste'") # TODO: what is correct overhead for form content? OVERHEAD = 1024 if 'paste' not in request.query and request.content_length == -1 or request.content_length > CLIP_SIZE_LIMIT + OVERHEAD: return abort(418, f"Paste size can not exceed {CLIP_SIZE_LIMIT}") content = request.params['paste'].encode('utf-8') if len(content) > CLIP_SIZE_LIMIT: return abort(418, f"Paste size can not exceed {CLIP_SIZE_LIMIT}") _bytes = blake(content, person='clip'.encode('utf-8')) _b32 = bytes_to_base32(_bytes) directory = f'app/rest/static/{_b32}' try: os.mkdir(directory, mode=0o700, dir_fd=None) except FileExistsError: pass fd = os.open(f'{directory}/{_b32}.file', os.O_WRONLY | os.O_TRUNC | os.O_CREAT, 0o600) with open(fd, "wb") as f: f.write(content) form = template('clip-form', action='/clip', method='post', content=content) response.content_type = 'text/html; charset=utf-8' #return HTTPResponse(template('paste', form=form, link=f'{LOCATION}/clip/{_b32}'), 201) return redirect(f'/clip?hash={_b32}') def validate(filename: str) -> bytes: ret = static_file('/'.join([filename,]*2) + '.file', root='app/rest/static') if isinstance(ret, HTTPError): return abort(404, f"No such paste: {filename}") if ret.content_length > CLIP_SIZE_LIMIT: return abort(418, f"Paste size exceeds {CLIP_SIZE_LIMIT}") content: bytes = ret.body.read() if isinstance(ret.body, BufferedReader) else ret.body.encode('utf-8') _bytes = blake(content, person='clip'.encode('utf-8')) _b32 = bytes_to_base32(_bytes) if _b32 != filename: return abort(410, f"Paste content differs") return content @route('/clip/', method='GET') def get_clip(filename): filename = normalize_base32(filename) if not request.params.raw.lower() == 'true': # TODO: return a form with timeout to a GET instead ? return redirect(f'/clip?hash={filename}') ret = validate(filename) if isinstance(ret, HTTPError): return ret return static_file('/'.join([filename,]*2) + '.file', root='app/rest/static')