validate.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #
  2. # Copyright (c) Daniel Sheffield 2023
  3. # All rights reserved
  4. #
  5. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  6. # https://www.ietf.org/rfc/rfc3696.txt
  7. """
  8. Without quotes, local-parts may consist of any combination of
  9. alphabetic characters, digits, or any of the special characters
  10. ! # $ % & ' * + - / = ? ^ _ ` . { | } ~
  11. period (".") may also appear, but may not be used to start or end the
  12. local part, nor may two or more consecutive periods appear. Stated
  13. differently, any ASCII graphic (printing) character other than the
  14. at-sign ("@"), backslash, double quote, comma, or square brackets may
  15. appear without quoting. If any of that list of excluded characters
  16. are to appear, they must be quoted.
  17. """
  18. from io import BufferedReader
  19. import mimetypes
  20. from itertools import chain
  21. import os
  22. from bottle import static_file, response, HTTPError, abort, LocalRequest, HTTPResponse
  23. from urllib.parse import urlparse, quote, quote_plus
  24. from .hash_util import blake_file, bytes_to_base32, blake
  25. # according to rfc3696
  26. URL_MUST_ESCAPE = bytes([
  27. x for x in chain(
  28. # control characters
  29. range(int('0x1F', 0)+1),
  30. # 0x7F and non 7bit-ASCII
  31. range(int('0x7F', 0,), int('0xFF', 0)+1),
  32. # specifically excluded
  33. b'@\\",[]'
  34. )
  35. ])
  36. # so give this list to urllib.parse.quote which follows rfc3986
  37. URL_SAFE = bytes(( i for i in range(int('0xff',0)+1) if i not in map(int, URL_MUST_ESCAPE) ))
  38. CLIP_SIZE_LIMIT = 65535
  39. def validate(filename: str) -> bytes:
  40. ret = static_file('/'.join([filename,]*2) + '.file', root='rest/static')
  41. if isinstance(ret, HTTPError):
  42. return abort(404, f"No such paste: {filename}")
  43. if ret.content_length > CLIP_SIZE_LIMIT:
  44. return abort(418, f"Paste size exceeds {CLIP_SIZE_LIMIT}")
  45. content: bytes = ret.body.read() if isinstance(ret.body, BufferedReader) else ret.body.encode('utf-8')
  46. _bytes = blake(content, person='clip'.encode('utf-8'))
  47. _b32 = bytes_to_base32(_bytes)
  48. if _b32 != filename:
  49. return abort(410, f"Paste content differs")
  50. return content
  51. def get_filename(filename: str, root: str = 'rest/static'):
  52. path = '/'.join([filename,]*2)
  53. try:
  54. with open(f'{root}/{path}.name', "r") as f:
  55. name = f.read()
  56. return name
  57. except:
  58. pass
  59. def get_file_size(filename: str, root: str = 'rest/static'):
  60. path = '/'.join([filename,]*2)
  61. return os.stat(f'{root}/{path}.file').st_size
  62. def get_file_mimetype(name):
  63. mimetype = mimetypes.guess_type(name, strict=False)[0] if name else True
  64. return mimetype
  65. def validate_file(filename: str, root: str = 'rest/static', download=True, mimetype=True) -> HTTPResponse:
  66. path = '/'.join([filename,]*2)
  67. name = get_filename(filename)
  68. mimetype = mimetype if mimetype and mimetype is not True else get_file_mimetype(name)
  69. ret = static_file(f'{path}.file', root=root, download=name if name and download else download, mimetype=mimetype)
  70. if isinstance(ret, HTTPError):
  71. return abort(404, f"No such upload: {filename}")
  72. _bytes = blake_file(f'{path}.file', person='upload'.encode('utf-8'), root=root)
  73. _b32 = bytes_to_base32(_bytes)
  74. if _b32 != filename:
  75. return abort(410, f"Uploaded content differs")
  76. return ret
  77. def validate_parameter(request: LocalRequest, name: str) -> bytes:
  78. if name not in request.params:
  79. return abort(400, f"Missing parameter: '{name}'")
  80. # TODO: what is correct overhead for form content?
  81. OVERHEAD = 1024
  82. content: bytes = request.query.get(name, None)
  83. content_length = request.content_length
  84. if content_length == -1:
  85. return abort(418, f"Content-Length must be specified")
  86. if content_length > CLIP_SIZE_LIMIT + OVERHEAD:
  87. return abort(418, f"Content-Length can not exceed {CLIP_SIZE_LIMIT*3} bytes")
  88. # TODO: add test for both query/form param
  89. if 'multipart/form-data' in request.content_type:
  90. # TODO: what about binary data ?
  91. content: bytes = (content or request.params[name].encode('utf-8'))
  92. else:
  93. content: bytes = (content or request.params[name].encode('latin-1'))
  94. if len(content) > CLIP_SIZE_LIMIT:
  95. return abort(418, f"Paste can not exceed {CLIP_SIZE_LIMIT} bytes")
  96. return content
  97. def validate_url(url: str) -> str:
  98. scheme, netloc, path, params, query, fragment = urlparse(url)
  99. if not scheme: return abort(400, "URL has no scheme")
  100. if scheme == 'file' and not path: return abort(400, "File URL has no path")
  101. if scheme in ('http', 'https') and not netloc: return abort(400, "HTTP(S) URL has no netloc")
  102. if netloc:
  103. try:
  104. user_info, loc = netloc.rsplit('@', 1)
  105. except ValueError:
  106. user_info = ''
  107. loc = ''
  108. if user_info:
  109. user_info = quote(user_info, safe=URL_SAFE)
  110. netloc = f"{user_info}@{''.join(loc)}"
  111. else:
  112. # TODO: do this properly, ie, valid dns-name/ip/port etc
  113. netloc = quote(netloc, safe=URL_SAFE)
  114. path = quote(path, safe=URL_SAFE)
  115. params = quote_plus(params, safe=URL_SAFE)
  116. query = quote(query, safe=URL_SAFE)
  117. fragment = quote(fragment, safe=URL_SAFE)
  118. url = f'{scheme}://{netloc}{path}{params}'
  119. if query:
  120. url = f'{url}?{query}'
  121. if fragment:
  122. url = f'{url}#{fragment}'
  123. return url