Fixed #19094 -- Improved FakePayload to support write, len and string input
Thanks Ondrej Slinták for the suggestion.
This commit is contained in:
parent
dfd4a71751
commit
dcbf08cce5
|
@ -43,11 +43,20 @@ class FakePayload(object):
|
||||||
length. This makes sure that views can't do anything under the test client
|
length. This makes sure that views can't do anything under the test client
|
||||||
that wouldn't work in Real Life.
|
that wouldn't work in Real Life.
|
||||||
"""
|
"""
|
||||||
def __init__(self, content):
|
def __init__(self, content=None):
|
||||||
self.__content = BytesIO(content)
|
self.__content = BytesIO()
|
||||||
self.__len = len(content)
|
self.__len = 0
|
||||||
|
self.read_started = False
|
||||||
|
if content is not None:
|
||||||
|
self.write(content)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return self.__len
|
||||||
|
|
||||||
def read(self, num_bytes=None):
|
def read(self, num_bytes=None):
|
||||||
|
if not self.read_started:
|
||||||
|
self.__content.seek(0)
|
||||||
|
self.read_started = True
|
||||||
if num_bytes is None:
|
if num_bytes is None:
|
||||||
num_bytes = self.__len or 0
|
num_bytes = self.__len or 0
|
||||||
assert self.__len >= num_bytes, "Cannot read more than the available bytes from the HTTP incoming data."
|
assert self.__len >= num_bytes, "Cannot read more than the available bytes from the HTTP incoming data."
|
||||||
|
@ -55,6 +64,13 @@ class FakePayload(object):
|
||||||
self.__len -= num_bytes
|
self.__len -= num_bytes
|
||||||
return content
|
return content
|
||||||
|
|
||||||
|
def write(self, content):
|
||||||
|
if self.read_started:
|
||||||
|
raise ValueError("Unable to write a payload after he's been read")
|
||||||
|
content = force_bytes(content)
|
||||||
|
self.__content.write(content)
|
||||||
|
self.__len += len(content)
|
||||||
|
|
||||||
|
|
||||||
class ClientHandler(BaseHandler):
|
class ClientHandler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -62,22 +62,20 @@ class FileUploadTests(TestCase):
|
||||||
|
|
||||||
def test_base64_upload(self):
|
def test_base64_upload(self):
|
||||||
test_string = "This data will be transmitted base64-encoded."
|
test_string = "This data will be transmitted base64-encoded."
|
||||||
payload = "\r\n".join([
|
payload = client.FakePayload("\r\n".join([
|
||||||
'--' + client.BOUNDARY,
|
'--' + client.BOUNDARY,
|
||||||
'Content-Disposition: form-data; name="file"; filename="test.txt"',
|
'Content-Disposition: form-data; name="file"; filename="test.txt"',
|
||||||
'Content-Type: application/octet-stream',
|
'Content-Type: application/octet-stream',
|
||||||
'Content-Transfer-Encoding: base64',
|
'Content-Transfer-Encoding: base64',
|
||||||
'',
|
'',]))
|
||||||
base64.b64encode(force_bytes(test_string)).decode('ascii'),
|
payload.write(b"\r\n" + base64.b64encode(force_bytes(test_string)) + b"\r\n")
|
||||||
'--' + client.BOUNDARY + '--',
|
payload.write('--' + client.BOUNDARY + '--\r\n')
|
||||||
'',
|
|
||||||
]).encode('utf-8')
|
|
||||||
r = {
|
r = {
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
||||||
'PATH_INFO': "/file_uploads/echo_content/",
|
'PATH_INFO': "/file_uploads/echo_content/",
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': payload,
|
||||||
}
|
}
|
||||||
response = self.client.request(**r)
|
response = self.client.request(**r)
|
||||||
received = json.loads(response.content.decode('utf-8'))
|
received = json.loads(response.content.decode('utf-8'))
|
||||||
|
@ -126,27 +124,23 @@ class FileUploadTests(TestCase):
|
||||||
"../..\\hax0rd.txt" # Relative path, mixed.
|
"../..\\hax0rd.txt" # Relative path, mixed.
|
||||||
]
|
]
|
||||||
|
|
||||||
payload = []
|
payload = client.FakePayload()
|
||||||
for i, name in enumerate(scary_file_names):
|
for i, name in enumerate(scary_file_names):
|
||||||
payload.extend([
|
payload.write('\r\n'.join([
|
||||||
'--' + client.BOUNDARY,
|
'--' + client.BOUNDARY,
|
||||||
'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
|
'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
|
||||||
'Content-Type: application/octet-stream',
|
'Content-Type: application/octet-stream',
|
||||||
'',
|
'',
|
||||||
'You got pwnd.'
|
'You got pwnd.\r\n'
|
||||||
])
|
]))
|
||||||
payload.extend([
|
payload.write('\r\n--' + client.BOUNDARY + '--\r\n')
|
||||||
'--' + client.BOUNDARY + '--',
|
|
||||||
'',
|
|
||||||
])
|
|
||||||
|
|
||||||
payload = "\r\n".join(payload).encode('utf-8')
|
|
||||||
r = {
|
r = {
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
||||||
'PATH_INFO': "/file_uploads/echo/",
|
'PATH_INFO': "/file_uploads/echo/",
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': payload,
|
||||||
}
|
}
|
||||||
response = self.client.request(**r)
|
response = self.client.request(**r)
|
||||||
|
|
||||||
|
@ -159,7 +153,7 @@ class FileUploadTests(TestCase):
|
||||||
def test_filename_overflow(self):
|
def test_filename_overflow(self):
|
||||||
"""File names over 256 characters (dangerous on some platforms) get fixed up."""
|
"""File names over 256 characters (dangerous on some platforms) get fixed up."""
|
||||||
name = "%s.txt" % ("f"*500)
|
name = "%s.txt" % ("f"*500)
|
||||||
payload = "\r\n".join([
|
payload = client.FakePayload("\r\n".join([
|
||||||
'--' + client.BOUNDARY,
|
'--' + client.BOUNDARY,
|
||||||
'Content-Disposition: form-data; name="file"; filename="%s"' % name,
|
'Content-Disposition: form-data; name="file"; filename="%s"' % name,
|
||||||
'Content-Type: application/octet-stream',
|
'Content-Type: application/octet-stream',
|
||||||
|
@ -167,13 +161,13 @@ class FileUploadTests(TestCase):
|
||||||
'Oops.'
|
'Oops.'
|
||||||
'--' + client.BOUNDARY + '--',
|
'--' + client.BOUNDARY + '--',
|
||||||
'',
|
'',
|
||||||
]).encode('utf-8')
|
]))
|
||||||
r = {
|
r = {
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
||||||
'PATH_INFO': "/file_uploads/echo/",
|
'PATH_INFO': "/file_uploads/echo/",
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': payload,
|
||||||
}
|
}
|
||||||
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
||||||
self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
|
self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
|
||||||
|
@ -184,7 +178,7 @@ class FileUploadTests(TestCase):
|
||||||
attempt to read beyond the end of the stream, and simply will handle
|
attempt to read beyond the end of the stream, and simply will handle
|
||||||
the part that can be parsed gracefully.
|
the part that can be parsed gracefully.
|
||||||
"""
|
"""
|
||||||
payload = "\r\n".join([
|
payload_str = "\r\n".join([
|
||||||
'--' + client.BOUNDARY,
|
'--' + client.BOUNDARY,
|
||||||
'Content-Disposition: form-data; name="file"; filename="foo.txt"',
|
'Content-Disposition: form-data; name="file"; filename="foo.txt"',
|
||||||
'Content-Type: application/octet-stream',
|
'Content-Type: application/octet-stream',
|
||||||
|
@ -192,14 +186,14 @@ class FileUploadTests(TestCase):
|
||||||
'file contents'
|
'file contents'
|
||||||
'--' + client.BOUNDARY + '--',
|
'--' + client.BOUNDARY + '--',
|
||||||
'',
|
'',
|
||||||
]).encode('utf-8')
|
])
|
||||||
payload = payload[:-10]
|
payload = client.FakePayload(payload_str[:-10])
|
||||||
r = {
|
r = {
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
'CONTENT_TYPE': client.MULTIPART_CONTENT,
|
||||||
'PATH_INFO': '/file_uploads/echo/',
|
'PATH_INFO': '/file_uploads/echo/',
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': payload,
|
||||||
}
|
}
|
||||||
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
||||||
self.assertEqual(got, {})
|
self.assertEqual(got, {})
|
||||||
|
|
Loading…
Reference in New Issue