|
@@ -1,166 +0,0 @@
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-import time
|
|
|
-from bottle import (
|
|
|
- route, request, response,
|
|
|
- redirect, abort, static_file,
|
|
|
- HTTPResponse,
|
|
|
-)
|
|
|
-from itertools import chain
|
|
|
-from base64 import b64decode, b64encode
|
|
|
-from linkpreview import link_preview
|
|
|
-
|
|
|
-from .hash_util import B32REGEX, normalize_base32, blake, bytes_to_base32
|
|
|
-from .qr import get_qr_code
|
|
|
-from .bar import get_bar_code
|
|
|
-from json import dumps, load, loads
|
|
|
-from sqlite3 import connect
|
|
|
-from datetime import datetime, timezone
|
|
|
-
|
|
|
-SCHEME = "https://"
|
|
|
-HOST = ""
|
|
|
-DOMAIN = "shandan.one"
|
|
|
-PORT = ""
|
|
|
-LOCATION = SCHEME + (f"{HOST}." if HOST else "") + DOMAIN + (f":{PORT}" if PORT else "")
|
|
|
-
|
|
|
-def parse_data_uri(content):
|
|
|
-
|
|
|
- _, *media, data = chain.from_iterable(map(
|
|
|
- lambda x: x.split(',', 1), content.split(':', 1)
|
|
|
- ))
|
|
|
- media = media and media[0]
|
|
|
- mimetype, *params, encoding = media.split(';')
|
|
|
- if '=' in encoding:
|
|
|
- params.append(encoding)
|
|
|
- encoding = None
|
|
|
-
|
|
|
- return {
|
|
|
- 'mimetype': mimetype,
|
|
|
- 'params': dict(map(lambda x: x.split('='), params)),
|
|
|
- 'encoding': encoding,
|
|
|
- 'data': data,
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-def parse_upload_placeholder(rowid):
|
|
|
- rowid = int(rowid)
|
|
|
- con = connect('util.db')
|
|
|
- content = None
|
|
|
- try:
|
|
|
- content = con.cursor().execute("""
|
|
|
-SELECT content FROM upload_temp WHERE rowid = ? LIMIT 1;
|
|
|
-""", (rowid,)).fetchall()[0][0]
|
|
|
- finally:
|
|
|
- con.close()
|
|
|
-
|
|
|
- data = parse_data_uri(content)
|
|
|
- assert data['encoding'] == 'base64', f"unsupported encoding: {data['encoding']}"
|
|
|
- data = b64decode(data['data'] + '==')
|
|
|
- return data
|
|
|
-
|
|
|
-
|
|
|
-@route('/<route:re:(clip|goto|upload|code)>/meta', method=['POST'])
|
|
|
-def get_meta(route):
|
|
|
- response.content_type = 'application/json'
|
|
|
- body = load(request.body)
|
|
|
- person = route
|
|
|
- if route == 'upload':
|
|
|
- data = parse_upload_placeholder(body['data'])
|
|
|
- elif route == 'code':
|
|
|
- data = dumps(body['data'], sort_keys=True).encode('utf-8')
|
|
|
- else:
|
|
|
- data = body['data'].encode('utf-8')
|
|
|
-
|
|
|
- _bytes = blake(data, person = person and person.encode('utf-8'))
|
|
|
- hash = bytes_to_base32(_bytes)
|
|
|
- fallback = f'https://shandan.one/{route}/{hash}'
|
|
|
-
|
|
|
- preview = None
|
|
|
- if route == 'goto':
|
|
|
- link = data.decode('utf-8')
|
|
|
- try:
|
|
|
- page = link_preview(link, parser="lxml")
|
|
|
- preview = {
|
|
|
- 'title': page.title,
|
|
|
- 'img': page.absolute_image,
|
|
|
- 'domain': page.site_name,
|
|
|
- 'link': link,
|
|
|
- }
|
|
|
- except:
|
|
|
- pass
|
|
|
- elif route == 'code':
|
|
|
- if body['data']['format'] == 'QR_CODE':
|
|
|
- preview = b64encode(get_qr_code(body['data']['content'], err_lvl=body['data']['errorCorrectionLevel']))
|
|
|
- else:
|
|
|
- preview = b64encode(get_bar_code(body['data']))
|
|
|
- preview = preview.decode('utf-8')
|
|
|
-
|
|
|
- qr = None
|
|
|
- if route != 'code':
|
|
|
- qr = get_qr_code(data, fallback = fallback).decode('utf-8')
|
|
|
-
|
|
|
- return dumps({
|
|
|
- 'hash': hash,
|
|
|
- 'qr': qr,
|
|
|
- 'preview': preview,
|
|
|
- })
|
|
|
-
|
|
|
-@route('/<route:re:(clip|goto|upload)>/normalize', method=['GET'])
|
|
|
-def normalize(route):
|
|
|
- _hash = request.params.hash
|
|
|
- response.content_type = 'application/json'
|
|
|
- return dumps({
|
|
|
- 'i': _hash,
|
|
|
- 'o': normalize_base32(_hash) if _hash else None,
|
|
|
- })
|
|
|
-
|
|
|
-@route('/static/<filename:path>')
|
|
|
-def send_static(filename):
|
|
|
- return static_file(filename, root='rest/static')
|
|
|
-
|
|
|
-@route(f'/<route:re:(clip|goto|code)>/<hash:re:{B32REGEX}{{1,5}}>', method='GET')
|
|
|
-def get_clip(route, hash):
|
|
|
- hash = hash and normalize_base32(hash)
|
|
|
- return redirect(f'/{route}/?hash={hash}&go=true')
|
|
|
-
|
|
|
-
|
|
|
-@route(f'/upload/<hash:re:{B32REGEX}{{1,5}}>', method='GET')
|
|
|
-def get_upload(hash):
|
|
|
- hash = hash and normalize_base32(hash)
|
|
|
- con = connect('util.db')
|
|
|
- fname, mimetype, content = (None, None, None)
|
|
|
- try:
|
|
|
- fname, mimetype, content, created = con.cursor().execute("""
|
|
|
-SELECT name, mime, content, created FROM upload WHERE hash = ? LIMIT 1;
|
|
|
-""", (hash,)).fetchall()[0]
|
|
|
- finally:
|
|
|
- con.close()
|
|
|
-
|
|
|
- created = datetime.strptime(created, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc).timestamp()
|
|
|
- data = parse_data_uri(content)
|
|
|
- assert data['mimetype'].split(';', 1)[0] == mimetype.split(';', 1)[0].split('+')[0], f"mimetype in db and data uri differ"
|
|
|
- charset = data['params'].get('charset', None)
|
|
|
- assert data['encoding'] == 'base64', f"unsupported encoding: {data['encoding']}"
|
|
|
- content = b64decode(data['data'] + '==')
|
|
|
-
|
|
|
- headers = {}
|
|
|
- headers['Content-Length'] = len(content)
|
|
|
-
|
|
|
-
|
|
|
- headers['Content-Disposition'] = 'attachment; filename="%s"' % (fname or hash)
|
|
|
- headers['Content-Encoding'] = 'application/octet-stream'
|
|
|
-
|
|
|
- if mimetype:
|
|
|
- if mimetype[:5] == 'text/' and charset and 'charset' not in mimetype:
|
|
|
- mimetype += '; charset=%s' % charset
|
|
|
- headers['Content-Type'] = mimetype
|
|
|
- lm = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(created))
|
|
|
- headers['Last-Modified'] = lm
|
|
|
- return HTTPResponse(content, **headers)
|
|
|
-
|
|
|
-@route('/<any>/', method='GET')
|
|
|
-def redirect_trailing_slash(any): return redirect(f'/{any}')
|