# # Copyright (c) Daniel Sheffield 2023 # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY import os from io import BytesIO from threading import Thread from typing import Union from bottle import ( route, request, response, redirect, abort, template, static_file, FormsDict, HTTPError, ) from psycopg import Cursor, connect from psycopg.rows import TupleRow from linkpreview import link_preview from .qr import get_qr_code from .validate import get_file_mimetype, get_filename, validate, validate_file, validate_parameter, validate_url from .hash_util import normalize_base32 from .route_decorators import normalize, poison, cursor from .query_to_xml import get_categories, get_groups, get_products, get_tags from .CachedLoadingPage import CachedLoadingPage from .Cache import Cache from . import trend as worker from .save import save, save_upload host = f"host={os.getenv('HOST')}" db = f"dbname={os.getenv('DB', 'grocery')}" user = f"user={os.getenv('USER', 'das')}" password = f"password={os.getenv('PASSWORD','')}" if not password.split('=',1)[1]: password = '' conn = connect(f"{host} {db} {user} {password}") @route('/grocery/static/') def send_static(filename): return static_file(filename, root='app/rest/static') @route('/grocery/trend', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize def trend(key: str, forms: FormsDict, cache: Cache): page = cache[key] if page: return page _, _, path, *_ = request.urlparts return cache.add(key, CachedLoadingPage( template("loading", progress=[]), lambda queue: Thread(target=worker.trend, args=( queue, conn, path, forms )).start() )) @route('/grocery/groups', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def groups(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_groups(cur, forms) @route('/grocery/categories', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def categories(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_categories(cur, forms) @route('/grocery/products', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def products(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_products(cur, forms) @route('/grocery/tags', method=['GET', 'POST']) @poison(cache=Cache(10)) @normalize @cursor(connection=conn) def tags(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache): response.content_type = 'application/xhtml+xml; charset=utf-8' return get_tags(cur, forms) SCHEME = "https://" HOST = "" DOMAIN = "shandan.one" PORT = "" LOCATION = SCHEME + (f"{HOST}." if HOST else "") + DOMAIN + (f":{PORT}" if PORT else "") @route('/clip', method=['GET', 'POST']) def clip(): if request.method == 'GET': _hash = request.params.hash if _hash: _hash = normalize_base32(_hash) content = validate(_hash).decode('utf-8') else: content = None link = f'{LOCATION}/clip/{_hash}' if content else f'{LOCATION}/clip' svg = get_qr_code(content, fallback=link) response.content_type = 'text/html; charset=utf-8' form = template( 'clip-form', action='/clip', method='post', content=content, disabled=True if content else False ) return template( 'paste', form=form, svg=svg, link=link, disabled=True if content else False, download=f'/clip/{_hash}' if content else None ) if request.method == 'POST': content = validate_parameter(request, 'paste') if request.params.copy != 'true': _b32 = save(content) return redirect(f'/clip?hash={_b32}') response.content_type = 'text/html; charset=utf-8' form = template( 'clip-form', action='/clip', method='post', content=content, disabled=False ) link = f'{LOCATION}/clip' svg = get_qr_code(content, fallback=link) return template( 'paste', form=form, svg=svg, link=link, disabled=False, download=None ) @route('/clip/', method='GET') def get_clip(filename): filename = filename and normalize_base32(filename) if not request.params.raw.lower() == 'true': # TODO: return a form with timeout to a GET instead ? return redirect(f'/clip?hash={filename}') ret = validate(filename) if isinstance(ret, HTTPError): return ret return static_file('/'.join([filename,]*2) + '.file', root='app/rest/static') @route('/upload', method=['GET', 'POST']) def upload(): if request.method == 'GET': _hash = request.params.hash mimetype = None if _hash: _hash = normalize_base32(_hash) name = get_filename(_hash) mimetype = get_file_mimetype(name) link = f'{LOCATION}/upload/{_hash}' if _hash else f'{LOCATION}/upload' response.content_type = 'text/html; charset=utf-8' form = template('file-form', action='/upload', method='post') svg = get_qr_code(link) return template('upload', form=form, svg=svg, link=link, mimetype=mimetype) if request.method == 'POST': if 'paste' not in request.files: return abort(400, "Parameter 'paste' must be specified") upload = request.files['paste'] if isinstance(upload.file, BytesIO): if len(upload.file.read()) == 0: return abort(400, "File is empty") _b32 = save_upload(upload.raw_filename , upload.file, root='app/rest/static') return redirect(f'/upload?hash={_b32}') @route('/upload/', method='GET') def get_upload(filename): filename = filename and normalize_base32(filename) download = True if request.params.download == "false": download = False return validate_file(filename, download=download) @route('/goto', method=['GET', 'POST']) def goto(): if request.method == 'GET': _hash = request.params.hash if _hash: _hash = normalize_base32(_hash) content = validate(_hash).decode('utf-8') else: content = None if content and request.params.go == 'true': target = validate_url(content) return redirect(target) link = f'{LOCATION}/goto/{_hash}' if content else f'{LOCATION}/goto' svg = get_qr_code(content, fallback=link) disabled = True if content else False response.content_type = 'text/html; charset=utf-8' form = template( 'goto-form', action='/goto', method='post', content=content, disabled=disabled ) preview = dict() if content: try: page = link_preview(link, parser="lxml") preview['title'] = page.title preview['img'] = page.absolute_image preview['domain'] = page.site_name preview['link'] = content except: pass return template( 'goto', form=form, svg=svg, link=link, disabled=disabled, preview=preview, ) if request.method == 'POST': content = validate_parameter(request, 'url') _b32 = save(content) # validate but save content unmodified _ = validate_url(content.decode('utf-8')) return redirect(f'/goto?hash={_b32}') @route('/goto/', method='GET') def redirect_goto(filename): filename = filename and normalize_base32(filename) return redirect(f'/goto?hash={filename}&go=true') @route('//', method='GET') def redirect_trailing_slash(any): return redirect(f'/{any}')