Daniel Sheffield 1 rok pred
rodič
commit
7c1f6c7973

+ 11 - 44
app/rest/pyapi.py

@@ -3,21 +3,21 @@
 # All rights reserved
 #
 # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
-from io import BufferedReader
 import os
 from threading import Thread
 from typing import Union
 from bottle import (
     route, request, response,
-    redirect, abort,
+    redirect,
     template, static_file,
-    FormsDict, HTTPError, LocalRequest,
+    FormsDict, HTTPError,
 )
 from psycopg import Cursor, connect
 from psycopg.rows import TupleRow
 
-from .hash_util import blake, bytes_to_base32, hash_to_base32, hex_to_hash, normalize_base32
-from .route_decorators import normalize, normalize_query, poison, cursor
+from .validate import validate, validate_parameter, validate_url
+from .hash_util import blake, bytes_to_base32, normalize_base32
+from .route_decorators import normalize, poison, cursor
 from .query_to_xml import get_categories, get_groups, get_products, get_tags
 from .CachedLoadingPage import CachedLoadingPage
 from .Cache import Cache
@@ -89,49 +89,13 @@ def tags(cur: Cursor[TupleRow], key: Union[int, str], forms: FormsDict, cache: C
     response.content_type = 'application/xhtml+xml; charset=utf-8'
     return get_tags(cur, forms)
 
-CLIP_SIZE_LIMIT = 65535
+
 SCHEME = "https://"
 HOST = ""
 DOMAIN = "shandan.one"
 PORT = ""
 LOCATION = SCHEME + (f"{HOST}." if HOST else "") + DOMAIN + (f":{PORT}" if PORT else "")
 
-def validate(filename: str) -> bytes:
-    ret = static_file('/'.join([filename,]*2) + '.file', root='app/rest/static')
-    if isinstance(ret, HTTPError):
-        return abort(404, f"No such paste: {filename}")
-
-    if ret.content_length > CLIP_SIZE_LIMIT:
-        return abort(418, f"Paste size exceeds {CLIP_SIZE_LIMIT}")
-
-    content: bytes = ret.body.read() if isinstance(ret.body, BufferedReader) else ret.body.encode('utf-8')
-
-    _bytes = blake(content, person='clip'.encode('utf-8'))
-    _b32 = bytes_to_base32(_bytes)
-    if _b32 != filename:
-        return abort(410, f"Paste content differs")
-    return content
-
-
-def validate_parameter(request: LocalRequest, name: str) -> bytes:
-    if name not in request.params:
-        return abort(400, f"Missing parameter: '{name}'")
-    
-    # TODO: what is correct overhead for form content?
-    OVERHEAD = 1024
-    content: bytes = request.query.get(name, None)
-    content_length = request.content_length
-    if content_length == -1:
-        return abort(418, f"Content-Length must be specified")
-    if content_length > CLIP_SIZE_LIMIT + OVERHEAD:
-        return abort(418, f"Content-Length can not exceed {CLIP_SIZE_LIMIT+OVERHEAD}")
-
-    # TODO: add test for both query/form param
-    content: bytes = (content or request.params[name]).encode('latin-1')
-    if len(content) > CLIP_SIZE_LIMIT:
-        return abort(418, f"Paste can not exceed {CLIP_SIZE_LIMIT}")
-    return content
-
 
 def save(content: bytes, root='app/rest/static') -> str:
     _bytes = blake(content, person='clip'.encode('utf-8'))
@@ -226,8 +190,8 @@ def goto():
             content = None
         
         if content and request.params.go == 'true':
-            # TODO: urlencode this !?
-            return redirect(content)
+            target = validate_url(content)
+            return redirect(target)
         
         link = f'{LOCATION}/goto/{_hash}' if content else f'{LOCATION}/goto'
         disabled = True if content else False
@@ -244,6 +208,9 @@ def goto():
     if request.method == 'POST':
         content = validate_parameter(request, 'url')
         _b32 = save(content)
+
+        # validate but save content unmodified
+        _ = validate_url(content.decode('utf-8'))
         return redirect(f'/goto?hash={_b32}')
 
 

+ 8 - 8
app/rest/templates/goto-form.tpl

@@ -3,32 +3,32 @@
 % disabled = (setdefault("disabled", False) and 'readonly="true"') or ""
 <form id="goto" method="{{ method }}" action="{{ action }}">
   <style>
-textarea::-webkit-scrollbar {
+input[type="url"]::-webkit-scrollbar {
   width: 11px;
 }
-textarea {
+input[type="url"] {
   color: #cccccc;
   background-color: #080808;
   scrollbar-width: thin;
   scrollbar-color: var(--thumbBG) var(--scrollbarBG);
 }
-textarea::-webkit-scrollbar-track {
+input[type="url"]::-webkit-scrollbar-track {
   background: var(--scrollbarBG);
 }
-textarea::-webkit-scrollbar-thumb {
+input[type="url"]::-webkit-scrollbar-thumb {
   background-color: var(--thumbBG) ;
   border-radius: 6px;
   border: 3px solid var(--scrollbarBG);
 }
   </style>
-  <textarea
+  <input type="url"
     style="width: 80%"
-    id="paste-text-area"
+    id="input-url"
     name="url"
-    rows="1"
     {{!disabled}}
     required="true"
     autofocus="true"
-  >{{ content }}</textarea>
+    value="{{ content }}"
+  ></input>
 </form>
 <form id="new" method="get" action="{{ action }}"></form>

+ 2 - 2
app/rest/templates/paste.tpl

@@ -33,8 +33,8 @@ body {
         </div>
       </div>
       <div class="pure-u-1">
-        <div class="pure-button" style="margin: 1em 0 1em; background: #f5ab9e; color: #8c3a2b;">
-          <a href="{{!link}}">{{ link }}</a>
+        <div class="pure-button" style="margin: 1em 0 1em; background: #4f8f4f;">
+          <a href="{{!link}}" style="color: floralwhite;">{{ link }}</a>
         </div>
       <div class="pure-u-1">
 {{!form}}

+ 98 - 0
app/rest/validate.py

@@ -0,0 +1,98 @@
+#
+# Copyright (c) Daniel Sheffield 2023
+# All rights reserved
+#
+# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
+
+# https://www.ietf.org/rfc/rfc3696.txt
+
+"""
+   Without quotes, local-parts may consist of any combination of
+   alphabetic characters, digits, or any of the special characters
+
+      ! # $ % & ' * + - / = ?  ^ _ ` . { | } ~
+
+   period (".") may also appear, but may not be used to start or end the
+   local part, nor may two or more consecutive periods appear.  Stated
+   differently, any ASCII graphic (printing) character other than the
+   at-sign ("@"), backslash, double quote, comma, or square brackets may
+   appear without quoting.  If any of that list of excluded characters
+   are to appear, they must be quoted.
+"""
+
+from io import BufferedReader
+from itertools import chain, zip_longest
+from bottle import static_file, HTTPError, abort, LocalRequest
+from urllib.parse import urlparse
+from .hash_util import bytes_to_base32, blake
+
+MUST_ESCAPE = bytes([
+    x for x in chain(
+        range(int('0x1F', 0)+1),
+        range(int('0x7F', 0,), int('0xFF', 0)+1),
+        b'@\\",[]'
+    )
+])
+
+CLIP_SIZE_LIMIT = 65535
+def validate(filename: str) -> bytes:
+    ret = static_file('/'.join([filename,]*2) + '.file', root='app/rest/static')
+    if isinstance(ret, HTTPError):
+        return abort(404, f"No such paste: {filename}")
+
+    if ret.content_length > CLIP_SIZE_LIMIT:
+        return abort(418, f"Paste size exceeds {CLIP_SIZE_LIMIT}")
+
+    content: bytes = ret.body.read() if isinstance(ret.body, BufferedReader) else ret.body.encode('utf-8')
+
+    _bytes = blake(content, person='clip'.encode('utf-8'))
+    _b32 = bytes_to_base32(_bytes)
+    if _b32 != filename:
+        return abort(410, f"Paste content differs")
+    return content
+
+
+def validate_parameter(request: LocalRequest, name: str) -> bytes:
+    if name not in request.params:
+        return abort(400, f"Missing parameter: '{name}'")
+    
+    # TODO: what is correct overhead for form content?
+    OVERHEAD = 1024
+    content: bytes = request.query.get(name, None)
+    content_length = request.content_length
+    if content_length == -1:
+        return abort(418, f"Content-Length must be specified")
+    if content_length > CLIP_SIZE_LIMIT + OVERHEAD:
+        return abort(418, f"Content-Length can not exceed {CLIP_SIZE_LIMIT+OVERHEAD}")
+
+    # TODO: add test for both query/form param
+    content: bytes = (content or request.params[name]).encode('latin-1')
+    if len(content) > CLIP_SIZE_LIMIT:
+        return abort(418, f"Paste can not exceed {CLIP_SIZE_LIMIT}")
+    return content
+
+def validate_url(url: str) -> str:
+    scheme, netloc, path, params, query, fragment = urlparse(url)
+
+    if not scheme: return abort(400, "URL has no scheme")
+
+    if scheme == 'file' and not path: return abort(400, "File URL has no path")
+
+    if scheme in ('http', 'https') and not netloc: return abort(400, "HTTP(S) URL has no netloc")
+
+    encoded = url.encode('utf-8')
+    ret = []
+    for x in encoded:
+        if x in map(int, MUST_ESCAPE):
+            ret.append(f'%{hex(x)[2:].zfill(2)}'.upper())
+            continue
+        else:
+            ret.append(bytes([x]).decode('ascii'))
+    
+    for idx, (c, *n) in enumerate(zip_longest(ret, ret[1:], ret[2:])):
+        if c == '%':
+            if None not in n and all([i.lower() in '0123456789abcdef' for i in n]):
+                continue
+            ret[idx] = '%25'
+
+    return ''.join(ret)

+ 68 - 0
test/rest/test_url.py

@@ -0,0 +1,68 @@
+#
+# Copyright (c) Daniel Sheffield 2023
+#
+# All rights reserved
+#
+# THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
+
+from pytest import mark, raises
+from bottle import HTTPError
+from app.rest.validate import validate_url
+
+@mark.parametrize('url, expected', [
+    ['file:///',]*2,
+    ['file:///a/b/c',]*2,
+    ['https://shandan.one',]*2,
+    ['https://www.shandan.one',]*2,
+    ['https://www.shandan.one?',]*2,
+    ['https://www.shandan.one/clip?id=123',]*2,
+    ['https://www.shandan.one/clip?id=123#',]*2,
+
+    # no double slash
+    #['file:/a/b/c', (HTTPError, ""),],
+    #['file:/abc', (HTTPError, ""),],
+
+    # no scheme
+    ['/a/b/c', (HTTPError, 400, "URL has no scheme"),],
+
+    # no file path scheme
+    ['file:', (HTTPError, 400, "File URL has no path"),],
+    
+    # no HTTPS domain
+    #['https://abc?id=1:', (HTTPError, 400, "HTTP(S) URL has no netloc"),],
+    
+    # conecutive dots
+    #['https://shandan.one/abc..id', 'https://shandan.one/abc..id',],
+
+    # unescaped char in reg_name
+    # TODO: should be invalid because netloc must be a domain name or ip ?
+    ['https://🌚.shandan.one', 'https://%F0%9F%8C%9A.shandan.one',],
+
+    # @ in user_info not allowed
+    # TODO: check this
+    ['https://user@mail@www.shandan.one','https://user%40mail%40www.shandan.one'],
+
+    # delimiters
+    # TODO: should < be translated to %3C ?
+    ['https://www.shandan.one?a<b', 'https://www.shandan.one?a<b'],
+
+    # more delimiters
+    ['https://www.shandan.one/clip?proportion=69%', 'https://www.shandan.one/clip?proportion=69%25'],
+
+    # fragment before end of reference URI
+    ['https://www.shandan.one/tiny#url?id=123', 'https://www.shandan.one/tiny#url?id=123'],
+])
+def test_validate_url_invalid(url: str, expected: str):
+    if isinstance(expected, tuple):
+        exp_exception, *ex_args = expected
+    else:
+        exp_exception = None
+    
+    if not exp_exception:
+        assert validate_url(url) == expected
+        return
+
+    with raises(exp_exception) as ex:
+        validate_url(url)
+    
+    assert list(ex.value.args) == ex_args