From dc724c5bf9d3b8d59c9571aa751c3cd001cdeced Mon Sep 17 00:00:00 2001 From: Piotr Kunicki Date: Thu, 14 Oct 2021 11:41:10 +0200 Subject: [PATCH] Fixed #30509 -- Made FileResponse better handle buffers and non-zero file offsets. --- django/http/response.py | 39 ++++++++----- tests/responses/test_fileresponse.py | 82 ++++++++++++++++++++++++++-- 2 files changed, 101 insertions(+), 20 deletions(-) diff --git a/django/http/response.py b/django/http/response.py index ab7158f7e0..68b7a8f285 100644 --- a/django/http/response.py +++ b/django/http/response.py @@ -1,4 +1,5 @@ import datetime +import io import json import mimetypes import os @@ -437,6 +438,7 @@ class FileResponse(StreamingHttpResponse): def __init__(self, *args, as_attachment=False, filename='', **kwargs): self.as_attachment = as_attachment self.filename = filename + self._no_default_content_type_set = 'content_type' not in kwargs or kwargs['content_type'] is None super().__init__(*args, **kwargs) def _set_streaming_content(self, value): @@ -456,29 +458,38 @@ class FileResponse(StreamingHttpResponse): Set some common response headers (Content-Length, Content-Type, and Content-Disposition) based on the `filelike` response content. """ - encoding_map = { - 'bzip2': 'application/x-bzip', - 'gzip': 'application/gzip', - 'xz': 'application/x-xz', - } - filename = getattr(filelike, 'name', None) - filename = filename if (isinstance(filename, str) and filename) else self.filename - if os.path.isabs(filename): - self.headers['Content-Length'] = os.path.getsize(filelike.name) - elif hasattr(filelike, 'getbuffer'): - self.headers['Content-Length'] = filelike.getbuffer().nbytes + filename = getattr(filelike, 'name', '') + filename = filename if isinstance(filename, str) else '' + seekable = hasattr(filelike, 'seek') and (not hasattr(filelike, 'seekable') or filelike.seekable()) + if hasattr(filelike, 'tell'): + if seekable: + initial_position = filelike.tell() + filelike.seek(0, io.SEEK_END) + self.headers['Content-Length'] = filelike.tell() - initial_position + filelike.seek(initial_position) + elif hasattr(filelike, 'getbuffer'): + self.headers['Content-Length'] = filelike.getbuffer().nbytes - filelike.tell() + elif os.path.exists(filename): + self.headers['Content-Length'] = os.path.getsize(filename) - filelike.tell() + elif seekable: + self.headers['Content-Length'] = sum(iter(lambda: len(filelike.read(self.block_size)), 0)) + filelike.seek(-int(self.headers['Content-Length']), io.SEEK_END) - if self.headers.get('Content-Type', '').startswith('text/html'): + filename = os.path.basename(self.filename or filename) + if self._no_default_content_type_set: if filename: content_type, encoding = mimetypes.guess_type(filename) # Encoding isn't set to prevent browsers from automatically # uncompressing files. - content_type = encoding_map.get(encoding, content_type) + content_type = { + 'bzip2': 'application/x-bzip', + 'gzip': 'application/gzip', + 'xz': 'application/x-xz', + }.get(encoding, content_type) self.headers['Content-Type'] = content_type or 'application/octet-stream' else: self.headers['Content-Type'] = 'application/octet-stream' - filename = self.filename or os.path.basename(filename) if filename: disposition = 'attachment' if self.as_attachment else 'inline' try: diff --git a/tests/responses/test_fileresponse.py b/tests/responses/test_fileresponse.py index 28e6f8e48d..85350cd9f1 100644 --- a/tests/responses/test_fileresponse.py +++ b/tests/responses/test_fileresponse.py @@ -18,17 +18,71 @@ class UnseekableBytesIO(io.BytesIO): class FileResponseTests(SimpleTestCase): def test_content_length_file(self): response = FileResponse(open(__file__, 'rb')) - self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__))) response.close() + self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__))) def test_content_length_buffer(self): response = FileResponse(io.BytesIO(b'binary content')) self.assertEqual(response.headers['Content-Length'], '14') + def test_content_length_nonzero_starting_position_file(self): + file = open(__file__, 'rb') + file.seek(10) + response = FileResponse(file) + response.close() + self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__) - 10)) + + def test_content_length_nonzero_starting_position_buffer(self): + test_tuples = ( + ('BytesIO', io.BytesIO), + ('UnseekableBytesIO', UnseekableBytesIO), + ) + for buffer_class_name, BufferClass in test_tuples: + with self.subTest(buffer_class_name=buffer_class_name): + buffer = BufferClass(b'binary content') + buffer.seek(10) + response = FileResponse(buffer) + self.assertEqual(response.headers['Content-Length'], '4') + + def test_content_length_nonzero_starting_position_file_seekable_no_tell(self): + class TestFile: + def __init__(self, path, *args, **kwargs): + self._file = open(path, *args, **kwargs) + + def read(self, n_bytes=-1): + return self._file.read(n_bytes) + + def seek(self, offset, whence=io.SEEK_SET): + return self._file.seek(offset, whence) + + def seekable(self): + return True + + @property + def name(self): + return self._file.name + + def close(self): + if self._file: + self._file.close() + self._file = None + + def __enter__(self): + return self + + def __exit__(self, e_type, e_val, e_tb): + self.close() + + file = TestFile(__file__, 'rb') + file.seek(10) + response = FileResponse(file) + response.close() + self.assertEqual(response.headers['Content-Length'], str(os.path.getsize(__file__) - 10)) + def test_content_type_file(self): response = FileResponse(open(__file__, 'rb')) - self.assertIn(response.headers['Content-Type'], ['text/x-python', 'text/plain']) response.close() + self.assertIn(response.headers['Content-Type'], ['text/x-python', 'text/plain']) def test_content_type_buffer(self): response = FileResponse(io.BytesIO(b'binary content')) @@ -38,8 +92,14 @@ class FileResponseTests(SimpleTestCase): response = FileResponse(io.BytesIO(b'binary content'), content_type='video/webm') self.assertEqual(response.headers['Content-Type'], 'video/webm') + def test_content_type_buffer_explicit_default(self): + response = FileResponse(io.BytesIO(b'binary content'), content_type='text/html') + self.assertEqual(response.headers['Content-Type'], 'text/html') + def test_content_type_buffer_named(self): test_tuples = ( + (__file__, ['text/x-python', 'text/plain']), + (__file__ + 'nosuchfile', ['application/octet-stream']), ('test_fileresponse.py', ['text/x-python', 'text/plain']), ('test_fileresponse.pynosuchfile', ['application/octet-stream']), ) @@ -59,15 +119,16 @@ class FileResponseTests(SimpleTestCase): (False, 'inline'), (True, 'attachment'), ) - for (filename, header_filename), (as_attachment, header_disposition)\ - in itertools.product(filenames, dispositions): + for (filename, header_filename), (as_attachment, header_disposition) in itertools.product( + filenames, dispositions + ): with self.subTest(filename=filename, disposition=header_disposition): response = FileResponse(open(__file__, 'rb'), filename=filename, as_attachment=as_attachment) + response.close() self.assertEqual( response.headers['Content-Disposition'], '%s; filename="%s"' % (header_disposition, header_filename), ) - response.close() def test_content_disposition_buffer(self): response = FileResponse(io.BytesIO(b'binary content')) @@ -89,7 +150,7 @@ class FileResponseTests(SimpleTestCase): filename='custom_name.py', ) self.assertEqual( - response.headers['Content-Disposition'], '%s; filename="custom_name.py"' % header_disposition + response.headers['Content-Disposition'], '%s; filename="custom_name.py"' % header_disposition, ) def test_response_buffer(self): @@ -108,6 +169,15 @@ class FileResponseTests(SimpleTestCase): response = FileResponse(buffer) self.assertEqual(list(response), [b'tent']) + def test_buffer_explicit_absolute_filename(self): + """ + Headers are set correctly with a buffer when an absolute filename is + provided. + """ + response = FileResponse(io.BytesIO(b'binary content'), filename=__file__) + self.assertEqual(response.headers['Content-Length'], '14') + self.assertEqual(response.headers['Content-Disposition'], 'inline; filename="test_fileresponse.py"') + @skipIf(sys.platform == 'win32', "Named pipes are Unix-only.") def test_file_from_named_pipe_response(self): with tempfile.TemporaryDirectory() as temp_dir: