validate.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #
  2. # Copyright (c) Daniel Sheffield 2023
  3. # All rights reserved
  4. #
  5. # THIS SOFTWARE IS PROVIDED AS IS WITHOUT WARRANTY
  6. # https://www.ietf.org/rfc/rfc3696.txt
  7. """
  8. Without quotes, local-parts may consist of any combination of
  9. alphabetic characters, digits, or any of the special characters
  10. ! # $ % & ' * + - / = ? ^ _ ` . { | } ~
  11. period (".") may also appear, but may not be used to start or end the
  12. local part, nor may two or more consecutive periods appear. Stated
  13. differently, any ASCII graphic (printing) character other than the
  14. at-sign ("@"), backslash, double quote, comma, or square brackets may
  15. appear without quoting. If any of that list of excluded characters
  16. are to appear, they must be quoted.
  17. """
  18. from io import BufferedReader
  19. import mimetypes
  20. from itertools import chain
  21. import os
  22. from bottle import static_file, response, HTTPError, abort, LocalRequest, HTTPResponse
  23. from urllib.parse import urlparse, quote, quote_plus
  24. from .hash_util import blake_file, bytes_to_base32, blake
  25. # according to rfc3696
  26. URL_MUST_ESCAPE = bytes([
  27. x for x in chain(
  28. # control characters
  29. range(int('0x1F', 0)+1),
  30. # 0x7F and non 7bit-ASCII
  31. range(int('0x7F', 0,), int('0xFF', 0)+1),
  32. # specifically excluded
  33. b'@\\",[]'
  34. )
  35. ])
  36. # so give this list to urllib.parse.quote which follows rfc3986
  37. URL_SAFE = bytes(( i for i in range(int('0xff',0)+1) if i not in map(int, URL_MUST_ESCAPE) ))
  38. CLIP_SIZE_LIMIT = 65535
  39. def validate(filename: str, tool: str, root='rest/static/files') -> bytes:
  40. ret = static_file('/'.join([filename,]*2) + '.file', root=root)
  41. if isinstance(ret, HTTPError):
  42. return abort(404, f"No such `{tool.title()}`: {filename}")
  43. if ret.content_length > CLIP_SIZE_LIMIT:
  44. return abort(418, f"{tool.title()} size exceeds {CLIP_SIZE_LIMIT}")
  45. content: bytes = ret.body.read() if isinstance(ret.body, BufferedReader) else ret.body.encode('utf-8')
  46. _bytes = blake(content, person=tool.encode('utf-8'))
  47. _b32 = bytes_to_base32(_bytes)
  48. if _b32 != filename:
  49. return abort(410, f"{tool.title()} content differs")
  50. return content
  51. def get_filename(filename: str, root: str = 'rest/static/files'):
  52. path = '/'.join([filename,]*2)
  53. try:
  54. with open(f'{root}/{path}.name', "r") as f:
  55. name = f.read()
  56. return name
  57. except:
  58. pass
  59. def get_file_size(filename: str, root: str = 'rest/static/files'):
  60. path = '/'.join([filename,]*2)
  61. try:
  62. return os.stat(f'{root}/{path}.file').st_sizea
  63. except:
  64. pass
  65. def get_file_mimetype(name):
  66. mimetype = mimetypes.guess_type(name, strict=False)[0] if name else True
  67. return mimetype
  68. def validate_file(filename: str, root: str = 'rest/static/files', download=True, mimetype=True) -> HTTPResponse:
  69. path = '/'.join([filename,]*2)
  70. name = get_filename(filename)
  71. mimetype = mimetype if mimetype and mimetype is not True else get_file_mimetype(name)
  72. ret = static_file(
  73. f'{path}.file',
  74. root=root,
  75. download=name if name and download else download,
  76. mimetype='auto' if mimetype is True else mimetype
  77. )
  78. if isinstance(ret, HTTPError):
  79. return abort(404, f"No such `Upload`: {filename}")
  80. _bytes = blake_file(f'{path}.file', person='upload'.encode('utf-8'), root=root)
  81. _b32 = bytes_to_base32(_bytes)
  82. if _b32 != filename:
  83. return abort(410, f"Uploaded content differs")
  84. return ret
  85. def validate_parameter(request: LocalRequest, name: str) -> bytes:
  86. if name not in request.params:
  87. return abort(400, f"Missing parameter: '{name}'")
  88. # TODO: what is correct overhead for form content?
  89. OVERHEAD = 1024
  90. content: bytes = request.query.get(name, None)
  91. content_length = request.content_length
  92. if content_length == -1:
  93. return abort(418, f"Content-Length must be specified")
  94. if content_length > CLIP_SIZE_LIMIT + OVERHEAD:
  95. return abort(418, f"Content-Length can not exceed {CLIP_SIZE_LIMIT*3} bytes")
  96. # TODO: add test for both query/form param
  97. if 'multipart/form-data' in request.content_type:
  98. # TODO: what about binary data ?
  99. content: bytes = (content or request.params[name].encode('utf-8'))
  100. else:
  101. content: bytes = (content or request.params[name].encode('latin-1'))
  102. if len(content) > CLIP_SIZE_LIMIT:
  103. return abort(418, f"Paste can not exceed {CLIP_SIZE_LIMIT} bytes")
  104. return content
  105. def validate_url(url: str) -> str:
  106. scheme, netloc, path, params, query, fragment = urlparse(url)
  107. if not scheme: return abort(400, "URL has no scheme")
  108. if scheme == 'file' and not path: return abort(400, "File URL has no path")
  109. if scheme in ('http', 'https') and not netloc: return abort(400, "HTTP(S) URL has no netloc")
  110. if netloc:
  111. try:
  112. user_info, loc = netloc.rsplit('@', 1)
  113. except ValueError:
  114. user_info = ''
  115. loc = ''
  116. if user_info:
  117. user_info = quote(user_info, safe=URL_SAFE)
  118. netloc = f"{user_info}@{''.join(loc)}"
  119. else:
  120. # TODO: do this properly, ie, valid dns-name/ip/port etc
  121. netloc = quote(netloc, safe=URL_SAFE)
  122. path = quote(path, safe=URL_SAFE)
  123. params = quote_plus(params, safe=URL_SAFE)
  124. query = quote(query, safe=URL_SAFE)
  125. fragment = quote(fragment, safe=URL_SAFE)
  126. url = f'{scheme}://{netloc}{path}{params}'
  127. if query:
  128. url = f'{url}?{query}'
  129. if fragment:
  130. url = f'{url}#{fragment}'
  131. return url