# # Copyright (c) Daniel Sheffield 2023 # All rights reserved # # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY # https://www.ietf.org/rfc/rfc3696.txt """ Without quotes, local-parts may consist of any combination of alphabetic characters, digits, or any of the special characters ! # $ % & ' * + - / = ? ^ _ ` . { | } ~ period (".") may also appear, but may not be used to start or end the local part, nor may two or more consecutive periods appear. Stated differently, any ASCII graphic (printing) character other than the at-sign ("@"), backslash, double quote, comma, or square brackets may appear without quoting. If any of that list of excluded characters are to appear, they must be quoted. """ from io import BufferedReader import mimetypes from itertools import chain import os from bottle import static_file, response, HTTPError, abort, LocalRequest, HTTPResponse from urllib.parse import urlparse, quote, quote_plus from .hash_util import blake_file, bytes_to_base32, blake # according to rfc3696 URL_MUST_ESCAPE = bytes([ x for x in chain( # control characters range(int('0x1F', 0)+1), # 0x7F and non 7bit-ASCII range(int('0x7F', 0,), int('0xFF', 0)+1), # specifically excluded b'@\\",[]' ) ]) # so give this list to urllib.parse.quote which follows rfc3986 URL_SAFE = bytes(( i for i in range(int('0xff',0)+1) if i not in map(int, URL_MUST_ESCAPE) )) CLIP_SIZE_LIMIT = 65535 def validate(filename: str, tool: str, root='rest/static/files') -> bytes: ret = static_file('/'.join([filename,]*2) + '.file', root=root) if isinstance(ret, HTTPError): return abort(404, f"No such `{tool.title()}`: {filename}") if ret.content_length > CLIP_SIZE_LIMIT: return abort(418, f"{tool.title()} size exceeds {CLIP_SIZE_LIMIT}") content: bytes = ret.body.read() if isinstance(ret.body, BufferedReader) else ret.body.encode('utf-8') _bytes = blake(content, person=tool.encode('utf-8')) _b32 = bytes_to_base32(_bytes) if _b32 != filename: return abort(410, f"{tool.title()} content differs") return content def get_filename(filename: str, root: str = 'rest/static/files'): path = '/'.join([filename,]*2) try: with open(f'{root}/{path}.name', "r") as f: name = f.read() return name except: pass def get_file_size(filename: str, root: str = 'rest/static/files'): path = '/'.join([filename,]*2) try: return os.stat(f'{root}/{path}.file').st_sizea except: pass def get_file_mimetype(name): mimetype = mimetypes.guess_type(name, strict=False)[0] if name else True return mimetype def validate_file(filename: str, root: str = 'rest/static/files', download=True, mimetype=True) -> HTTPResponse: path = '/'.join([filename,]*2) name = get_filename(filename) mimetype = mimetype if mimetype and mimetype is not True else get_file_mimetype(name) ret = static_file( f'{path}.file', root=root, download=name if name and download else download, mimetype='auto' if mimetype is True else mimetype ) if isinstance(ret, HTTPError): return abort(404, f"No such `Upload`: {filename}") _bytes = blake_file(f'{path}.file', person='upload'.encode('utf-8'), root=root) _b32 = bytes_to_base32(_bytes) if _b32 != filename: return abort(410, f"Uploaded content differs") return ret def validate_parameter(request: LocalRequest, name: str) -> bytes: if name not in request.params: return abort(400, f"Missing parameter: '{name}'") # TODO: what is correct overhead for form content? OVERHEAD = 1024 content: bytes = request.query.get(name, None) content_length = request.content_length if content_length == -1: return abort(418, f"Content-Length must be specified") if content_length > CLIP_SIZE_LIMIT + OVERHEAD: return abort(418, f"Content-Length can not exceed {CLIP_SIZE_LIMIT*3} bytes") # TODO: add test for both query/form param if 'multipart/form-data' in request.content_type: # TODO: what about binary data ? content: bytes = (content or request.params[name].encode('utf-8')) else: content: bytes = (content or request.params[name].encode('latin-1')) if len(content) > CLIP_SIZE_LIMIT: return abort(418, f"Paste can not exceed {CLIP_SIZE_LIMIT} bytes") return content def validate_url(url: str) -> str: scheme, netloc, path, params, query, fragment = urlparse(url) if not scheme: return abort(400, "URL has no scheme") if scheme == 'file' and not path: return abort(400, "File URL has no path") if scheme in ('http', 'https') and not netloc: return abort(400, "HTTP(S) URL has no netloc") if netloc: try: user_info, loc = netloc.rsplit('@', 1) except ValueError: user_info = '' loc = '' if user_info: user_info = quote(user_info, safe=URL_SAFE) netloc = f"{user_info}@{''.join(loc)}" else: # TODO: do this properly, ie, valid dns-name/ip/port etc netloc = quote(netloc, safe=URL_SAFE) path = quote(path, safe=URL_SAFE) params = quote_plus(params, safe=URL_SAFE) query = quote(query, safe=URL_SAFE) fragment = quote(fragment, safe=URL_SAFE) url = f'{scheme}://{netloc}{path}{params}' if query: url = f'{url}?{query}' if fragment: url = f'{url}#{fragment}' return url