123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- import os
- from io import BytesIO
- from threading import Thread
- from typing import Union
- from bottle import (
- route, request, response,
- redirect, abort,
- template, static_file,
- FormsDict, HTTPError,
- )
- from psycopg import Cursor, connect
- from psycopg.rows import TupleRow
- from linkpreview import link_preview
- from .qr import get_qr_code
- from .validate import get_file_mimetype, get_filename, validate, validate_file, validate_parameter, validate_url
- from .hash_util import normalize_base32
- from .route_decorators import normalize, poison, cursor
- from .query_to_xml import get_categories, get_groups, get_products, get_tags
- from .CachedLoadingPage import CachedLoadingPage
- from .Cache import Cache
- from . import trend as worker
- from .save import save, save_upload
- host = f"host={os.getenv('HOST')}"
- db = f"dbname={os.getenv('DB', 'grocery')}"
- user = f"user={os.getenv('USER', 'das')}"
- password = f"password={os.getenv('PASSWORD','')}"
- if not password.split('=',1)[1]:
- password = ''
- conn = connect(f"{host} {db} {user} {password}")
- @route('/grocery/static/<filename:path>')
- def send_static(filename):
- return static_file(filename, root='app/rest/static')
- @route('/grocery/trend', method=['GET', 'POST'])
- @poison(cache=Cache(10))
- @normalize
- def trend(key: str, forms: FormsDict, cache: Cache):
- page = cache[key]
- if page:
- return page
- _, _, path, *_ = request.urlparts
- return cache.add(key, CachedLoadingPage(
- template("loading", progress=[]),
- lambda queue: Thread(target=worker.trend, args=(
- queue, conn, path, forms
- )).start()
- ))
- @route('/grocery/groups', method=['GET', 'POST'])
- @poison(cache=Cache(10))
- @normalize
- @cursor(connection=conn)
- def groups(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache):
- response.content_type = 'application/xhtml+xml; charset=utf-8'
- return get_groups(cur, forms)
- @route('/grocery/categories', method=['GET', 'POST'])
- @poison(cache=Cache(10))
- @normalize
- @cursor(connection=conn)
- def categories(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache):
- response.content_type = 'application/xhtml+xml; charset=utf-8'
- return get_categories(cur, forms)
- @route('/grocery/products', method=['GET', 'POST'])
- @poison(cache=Cache(10))
- @normalize
- @cursor(connection=conn)
- def products(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache):
- response.content_type = 'application/xhtml+xml; charset=utf-8'
- return get_products(cur, forms)
- @route('/grocery/tags', method=['GET', 'POST'])
- @poison(cache=Cache(10))
- @normalize
- @cursor(connection=conn)
- def tags(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: Cache):
- response.content_type = 'application/xhtml+xml; charset=utf-8'
- return get_tags(cur, forms)
- SCHEME = "https://"
- HOST = ""
- DOMAIN = "shandan.one"
- PORT = ""
- LOCATION = SCHEME + (f"{HOST}." if HOST else "") + DOMAIN + (f":{PORT}" if PORT else "")
- @route('/clip', method=['GET', 'POST'])
- def clip():
- if request.method == 'GET':
- _hash = request.params.hash
- if _hash:
- _hash = normalize_base32(_hash)
- content = validate(_hash).decode('utf-8')
- else:
- content = None
- link = f'{LOCATION}/clip/{_hash}' if content else f'{LOCATION}/clip'
- svg = get_qr_code(content, fallback=link)
- response.content_type = 'text/html; charset=utf-8'
- form = template(
- 'clip-form',
- action='/clip',
- method='post',
- content=content,
- disabled=True if content else False
- )
- return template(
- 'paste',
- form=form,
- svg=svg,
- link=link,
- disabled=True if content else False,
- download=f'/clip/{_hash}' if content else None
- )
- if request.method == 'POST':
- content = validate_parameter(request, 'paste')
- if request.params.copy != 'true':
- _b32 = save(content)
- return redirect(f'/clip?hash={_b32}')
- response.content_type = 'text/html; charset=utf-8'
- form = template(
- 'clip-form',
- action='/clip',
- method='post',
- content=content,
- disabled=False
- )
- link = f'{LOCATION}/clip'
- svg = get_qr_code(content, fallback=link)
- return template(
- 'paste',
- form=form,
- svg=svg,
- link=link,
- disabled=False,
- download=None
- )
- @route('/clip/<filename:path>', method='GET')
- def get_clip(filename):
- filename = filename and normalize_base32(filename)
- if not request.params.raw.lower() == 'true':
-
- return redirect(f'/clip?hash={filename}')
- ret = validate(filename)
- if isinstance(ret, HTTPError):
- return ret
- return static_file('/'.join([filename,]*2) + '.file', root='app/rest/static')
- @route('/upload', method=['GET', 'POST'])
- def upload():
- if request.method == 'GET':
- _hash = request.params.hash
- mimetype = None
- if _hash:
- _hash = normalize_base32(_hash)
- name = get_filename(_hash)
- mimetype = get_file_mimetype(name)
-
- link = f'{LOCATION}/upload/{_hash}' if _hash else f'{LOCATION}/upload'
- response.content_type = 'text/html; charset=utf-8'
- form = template('file-form', action='/upload', method='post')
- svg = get_qr_code(link)
-
- return template('upload', form=form, svg=svg, link=link, mimetype=mimetype)
- if request.method == 'POST':
- if 'paste' not in request.files:
- return abort(400, "Parameter 'paste' must be specified")
- upload = request.files['paste']
- if isinstance(upload.file, BytesIO):
- if len(upload.file.read()) == 0:
- return abort(400, "File is empty")
- _b32 = save_upload(upload.raw_filename , upload.file, root='app/rest/static')
- return redirect(f'/upload?hash={_b32}')
- @route('/upload/<filename:path>', method='GET')
- def get_upload(filename):
- filename = filename and normalize_base32(filename)
- download = True
- if request.params.download == "false":
- download = False
- return validate_file(filename, download=download)
- @route('/goto', method=['GET', 'POST'])
- def goto():
- if request.method == 'GET':
- _hash = request.params.hash
- if _hash:
- _hash = normalize_base32(_hash)
- content = validate(_hash).decode('utf-8')
- else:
- content = None
- if content and request.params.go == 'true':
- target = validate_url(content)
- return redirect(target)
- link = f'{LOCATION}/goto/{_hash}' if content else f'{LOCATION}/goto'
- svg = get_qr_code(content, fallback=link)
- disabled = True if content else False
- response.content_type = 'text/html; charset=utf-8'
- form = template(
- 'goto-form',
- action='/goto',
- method='post',
- content=content,
- disabled=disabled
- )
- preview = dict()
- if content:
- try:
- page = link_preview(link, parser="lxml")
- preview['title'] = page.title
- preview['img'] = page.absolute_image
- preview['domain'] = page.site_name
- preview['link'] = content
- except:
- pass
- return template(
- 'goto',
- form=form,
- svg=svg,
- link=link,
- disabled=disabled,
- preview=preview,
- )
- if request.method == 'POST':
- content = validate_parameter(request, 'url')
- _b32 = save(content)
-
- _ = validate_url(content.decode('utf-8'))
- return redirect(f'/goto?hash={_b32}')
- @route('/goto/<filename:path>', method='GET')
- def redirect_goto(filename):
- filename = filename and normalize_base32(filename)
- return redirect(f'/goto?hash={filename}&go=true')
- @route('/<any>/', method='GET')
- def redirect_trailing_slash(any): return redirect(f'/{any}')
|