[py3] Fixed file_uploads tests
This commit is contained in:
parent
34ac145796
commit
0120985095
|
@ -6,7 +6,9 @@ file upload handlers for processing.
|
||||||
"""
|
"""
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
import base64
|
||||||
import cgi
|
import cgi
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.exceptions import SuspiciousOperation
|
from django.core.exceptions import SuspiciousOperation
|
||||||
from django.utils.datastructures import MultiValueDict
|
from django.utils.datastructures import MultiValueDict
|
||||||
|
@ -199,7 +201,7 @@ class MultiPartParser(object):
|
||||||
if transfer_encoding == 'base64':
|
if transfer_encoding == 'base64':
|
||||||
# We only special-case base64 transfer encoding
|
# We only special-case base64 transfer encoding
|
||||||
try:
|
try:
|
||||||
chunk = str(chunk).decode('base64')
|
chunk = base64.b64decode(chunk)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Since this is only a chunk, any error is an unfixable error.
|
# Since this is only a chunk, any error is an unfixable error.
|
||||||
raise MultiPartParserError("Could not decode base64 data: %r" % e)
|
raise MultiPartParserError("Could not decode base64 data: %r" % e)
|
||||||
|
|
|
@ -12,6 +12,7 @@ from django.core.files import temp as tempfile
|
||||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||||
from django.http.multipartparser import MultiPartParser
|
from django.http.multipartparser import MultiPartParser
|
||||||
from django.test import TestCase, client
|
from django.test import TestCase, client
|
||||||
|
from django.utils.encoding import smart_bytes
|
||||||
from django.utils.six import StringIO
|
from django.utils.six import StringIO
|
||||||
from django.utils import unittest
|
from django.utils import unittest
|
||||||
|
|
||||||
|
@ -48,12 +49,12 @@ class FileUploadTests(TestCase):
|
||||||
'file_field2': file2,
|
'file_field2': file2,
|
||||||
}
|
}
|
||||||
|
|
||||||
for key in post_data.keys():
|
for key in list(post_data):
|
||||||
try:
|
try:
|
||||||
post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
|
post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
|
||||||
post_data[key].seek(0)
|
post_data[key].seek(0)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
post_data[key + '_hash'] = hashlib.sha1(post_data[key]).hexdigest()
|
post_data[key + '_hash'] = hashlib.sha1(smart_bytes(post_data[key])).hexdigest()
|
||||||
|
|
||||||
response = self.client.post('/file_uploads/verify/', post_data)
|
response = self.client.post('/file_uploads/verify/', post_data)
|
||||||
|
|
||||||
|
@ -67,7 +68,7 @@ class FileUploadTests(TestCase):
|
||||||
'Content-Type: application/octet-stream',
|
'Content-Type: application/octet-stream',
|
||||||
'Content-Transfer-Encoding: base64',
|
'Content-Transfer-Encoding: base64',
|
||||||
'',
|
'',
|
||||||
base64.b64encode(test_string),
|
base64.b64encode(smart_bytes(test_string)).decode('ascii'),
|
||||||
'--' + client.BOUNDARY + '--',
|
'--' + client.BOUNDARY + '--',
|
||||||
'',
|
'',
|
||||||
]).encode('utf-8')
|
]).encode('utf-8')
|
||||||
|
@ -79,7 +80,7 @@ class FileUploadTests(TestCase):
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': client.FakePayload(payload),
|
||||||
}
|
}
|
||||||
response = self.client.request(**r)
|
response = self.client.request(**r)
|
||||||
received = json.loads(response.content)
|
received = json.loads(response.content.decode('utf-8'))
|
||||||
|
|
||||||
self.assertEqual(received['file'], test_string)
|
self.assertEqual(received['file'], test_string)
|
||||||
|
|
||||||
|
@ -150,7 +151,7 @@ class FileUploadTests(TestCase):
|
||||||
response = self.client.request(**r)
|
response = self.client.request(**r)
|
||||||
|
|
||||||
# The filenames should have been sanitized by the time it got to the view.
|
# The filenames should have been sanitized by the time it got to the view.
|
||||||
recieved = json.loads(response.content)
|
recieved = json.loads(response.content.decode('utf-8'))
|
||||||
for i, name in enumerate(scary_file_names):
|
for i, name in enumerate(scary_file_names):
|
||||||
got = recieved["file%s" % i]
|
got = recieved["file%s" % i]
|
||||||
self.assertEqual(got, "hax0rd.txt")
|
self.assertEqual(got, "hax0rd.txt")
|
||||||
|
@ -174,7 +175,7 @@ class FileUploadTests(TestCase):
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': client.FakePayload(payload),
|
||||||
}
|
}
|
||||||
got = json.loads(self.client.request(**r).content)
|
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
||||||
self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
|
self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
|
||||||
|
|
||||||
def test_truncated_multipart_handled_gracefully(self):
|
def test_truncated_multipart_handled_gracefully(self):
|
||||||
|
@ -200,7 +201,7 @@ class FileUploadTests(TestCase):
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(payload),
|
'wsgi.input': client.FakePayload(payload),
|
||||||
}
|
}
|
||||||
got = json.loads(self.client.request(**r).content)
|
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
||||||
self.assertEqual(got, {})
|
self.assertEqual(got, {})
|
||||||
|
|
||||||
def test_empty_multipart_handled_gracefully(self):
|
def test_empty_multipart_handled_gracefully(self):
|
||||||
|
@ -215,7 +216,7 @@ class FileUploadTests(TestCase):
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'wsgi.input': client.FakePayload(b''),
|
'wsgi.input': client.FakePayload(b''),
|
||||||
}
|
}
|
||||||
got = json.loads(self.client.request(**r).content)
|
got = json.loads(self.client.request(**r).content.decode('utf-8'))
|
||||||
self.assertEqual(got, {})
|
self.assertEqual(got, {})
|
||||||
|
|
||||||
def test_custom_upload_handler(self):
|
def test_custom_upload_handler(self):
|
||||||
|
@ -231,12 +232,12 @@ class FileUploadTests(TestCase):
|
||||||
|
|
||||||
# Small file posting should work.
|
# Small file posting should work.
|
||||||
response = self.client.post('/file_uploads/quota/', {'f': smallfile})
|
response = self.client.post('/file_uploads/quota/', {'f': smallfile})
|
||||||
got = json.loads(response.content)
|
got = json.loads(response.content.decode('utf-8'))
|
||||||
self.assertTrue('f' in got)
|
self.assertTrue('f' in got)
|
||||||
|
|
||||||
# Large files don't go through.
|
# Large files don't go through.
|
||||||
response = self.client.post("/file_uploads/quota/", {'f': bigfile})
|
response = self.client.post("/file_uploads/quota/", {'f': bigfile})
|
||||||
got = json.loads(response.content)
|
got = json.loads(response.content.decode('utf-8'))
|
||||||
self.assertTrue('f' not in got)
|
self.assertTrue('f' not in got)
|
||||||
|
|
||||||
def test_broken_custom_upload_handler(self):
|
def test_broken_custom_upload_handler(self):
|
||||||
|
@ -274,7 +275,7 @@ class FileUploadTests(TestCase):
|
||||||
'field5': 'test7',
|
'field5': 'test7',
|
||||||
'file2': (file2, file2a)
|
'file2': (file2, file2a)
|
||||||
})
|
})
|
||||||
got = json.loads(response.content)
|
got = json.loads(response.content.decode('utf-8'))
|
||||||
|
|
||||||
self.assertEqual(got.get('file1'), 1)
|
self.assertEqual(got.get('file1'), 1)
|
||||||
self.assertEqual(got.get('file2'), 2)
|
self.assertEqual(got.get('file2'), 2)
|
||||||
|
@ -299,8 +300,8 @@ class FileUploadTests(TestCase):
|
||||||
# it raises when there is an attempt to read more than the available bytes:
|
# it raises when there is an attempt to read more than the available bytes:
|
||||||
try:
|
try:
|
||||||
client.FakePayload(b'a').read(2)
|
client.FakePayload(b'a').read(2)
|
||||||
except Exception as reference_error:
|
except Exception as err:
|
||||||
pass
|
reference_error = err
|
||||||
|
|
||||||
# install the custom handler that tries to access request.POST
|
# install the custom handler that tries to access request.POST
|
||||||
self.client.handler = POSTAccessingHandler()
|
self.client.handler = POSTAccessingHandler()
|
||||||
|
|
|
@ -7,6 +7,7 @@ import os
|
||||||
from django.core.files.uploadedfile import UploadedFile
|
from django.core.files.uploadedfile import UploadedFile
|
||||||
from django.http import HttpResponse, HttpResponseServerError
|
from django.http import HttpResponse, HttpResponseServerError
|
||||||
from django.utils import six
|
from django.utils import six
|
||||||
|
from django.utils.encoding import smart_bytes
|
||||||
|
|
||||||
from .models import FileModel, UPLOAD_TO
|
from .models import FileModel, UPLOAD_TO
|
||||||
from .tests import UNICODE_FILENAME
|
from .tests import UNICODE_FILENAME
|
||||||
|
@ -45,7 +46,7 @@ def file_upload_view_verify(request):
|
||||||
if isinstance(value, UploadedFile):
|
if isinstance(value, UploadedFile):
|
||||||
new_hash = hashlib.sha1(value.read()).hexdigest()
|
new_hash = hashlib.sha1(value.read()).hexdigest()
|
||||||
else:
|
else:
|
||||||
new_hash = hashlib.sha1(value).hexdigest()
|
new_hash = hashlib.sha1(smart_bytes(value)).hexdigest()
|
||||||
if new_hash != submitted_hash:
|
if new_hash != submitted_hash:
|
||||||
return HttpResponseServerError()
|
return HttpResponseServerError()
|
||||||
|
|
||||||
|
@ -95,7 +96,7 @@ def file_upload_echo_content(request):
|
||||||
"""
|
"""
|
||||||
Simple view to echo back the content of uploaded files for tests.
|
Simple view to echo back the content of uploaded files for tests.
|
||||||
"""
|
"""
|
||||||
r = dict([(k, f.read()) for k, f in request.FILES.items()])
|
r = dict([(k, f.read().decode('utf-8')) for k, f in request.FILES.items()])
|
||||||
return HttpResponse(json.dumps(r))
|
return HttpResponse(json.dumps(r))
|
||||||
|
|
||||||
def file_upload_quota(request):
|
def file_upload_quota(request):
|
||||||
|
|
Loading…
Reference in New Issue