Fixed #7658 -- Added some Windows-specific tempfile handling. The standard

stuff doesn't work with the way Django's file uploading code wants to operate.
Patch from Mike Axiak.


git-svn-id: http://code.djangoproject.com/svn/django/trunk@8096 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
Malcolm Tredinnick 2008-07-26 22:48:51 +00:00
parent e29aece743
commit 103d484807
3 changed files with 74 additions and 15 deletions

58
django/core/files/temp.py Normal file
View File

@ -0,0 +1,58 @@
"""
The temp module provides a NamedTemporaryFile that can be re-opened on any
platform. Most platforms use the standard Python tempfile.TemporaryFile class,
but MS Windows users are given a custom class.
This is needed because in Windows NT, the default implementation of
NamedTemporaryFile uses the O_TEMPORARY flag, and thus cannot be reopened [1].
1: http://mail.python.org/pipermail/python-list/2005-December/359474.html
"""
import os
import tempfile
__all__ = ('NamedTemporaryFile', 'gettempdir',)
if os.name == 'nt':
class TemporaryFile(object):
"""
Temporary file object constructor that works in Windows and supports
reopening of the temporary file in windows.
"""
def __init__(self, mode='w+b', bufsize=-1, suffix='', prefix='',
dir=None):
fd, name = tempfile.mkstemp(suffix=suffix, prefix=prefix,
dir=dir)
self.name = name
self._file = os.fdopen(fd, mode, bufsize)
def __del__(self):
try:
self._file.close()
except (OSError, IOError):
pass
try:
os.unlink(self.name)
except (OSError):
pass
try:
super(TemporaryFile, self).__del__()
except AttributeError:
pass
def read(self, *args): return self._file.read(*args)
def seek(self, offset): return self._file.seek(offset)
def write(self, s): return self._file.write(s)
def close(self): return self._file.close()
def __iter__(self): return iter(self._file)
def readlines(self, size=None): return self._file.readlines(size)
def xreadlines(self): return self._file.xreadlines()
NamedTemporaryFile = TemporaryFile
else:
NamedTemporaryFile = tempfile.NamedTemporaryFile
gettempdir = tempfile.gettempdir

View File

@ -3,7 +3,6 @@ Classes representing uploaded files.
""" """
import os import os
import tempfile
import warnings import warnings
try: try:
from cStringIO import StringIO from cStringIO import StringIO
@ -12,6 +11,8 @@ except ImportError:
from django.conf import settings from django.conf import settings
from django.core.files import temp as tempfile
__all__ = ('UploadedFile', 'TemporaryUploadedFile', 'InMemoryUploadedFile', 'SimpleUploadedFile') __all__ = ('UploadedFile', 'TemporaryUploadedFile', 'InMemoryUploadedFile', 'SimpleUploadedFile')
# Because we fooled around with it a bunch, UploadedFile has a bunch # Because we fooled around with it a bunch, UploadedFile has a bunch

View File

@ -2,9 +2,9 @@ import os
import errno import errno
import sha import sha
import shutil import shutil
import tempfile
import unittest import unittest
from django.core.files import temp as tempfile
from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase, client from django.test import TestCase, client
from django.utils import simplejson from django.utils import simplejson
@ -22,7 +22,7 @@ class FileUploadTests(TestCase):
def test_large_upload(self): def test_large_upload(self):
tdir = tempfile.gettempdir() tdir = tempfile.gettempdir()
file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir) file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
file1.write('a' * (2 ** 21)) file1.write('a' * (2 ** 21))
file1.seek(0) file1.seek(0)
@ -58,11 +58,11 @@ class FileUploadTests(TestCase):
pass pass
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
def test_dangerous_file_names(self): def test_dangerous_file_names(self):
"""Uploaded file names should be sanitized before ever reaching the view.""" """Uploaded file names should be sanitized before ever reaching the view."""
# This test simulates possible directory traversal attacks by a # This test simulates possible directory traversal attacks by a
# malicious uploader We have to do some monkeybusiness here to construct # malicious uploader We have to do some monkeybusiness here to construct
# a malicious payload with an invalid file name (containing os.sep or # a malicious payload with an invalid file name (containing os.sep or
# os.pardir). This similar to what an attacker would need to do when # os.pardir). This similar to what an attacker would need to do when
# trying such an attack. # trying such an attack.
@ -79,7 +79,7 @@ class FileUploadTests(TestCase):
"..\\..\\hax0rd.txt", # Relative path, win-style. "..\\..\\hax0rd.txt", # Relative path, win-style.
"../..\\hax0rd.txt" # Relative path, mixed. "../..\\hax0rd.txt" # Relative path, mixed.
] ]
payload = [] payload = []
for i, name in enumerate(scary_file_names): for i, name in enumerate(scary_file_names):
payload.extend([ payload.extend([
@ -93,7 +93,7 @@ class FileUploadTests(TestCase):
'--' + client.BOUNDARY + '--', '--' + client.BOUNDARY + '--',
'', '',
]) ])
payload = "\r\n".join(payload) payload = "\r\n".join(payload)
r = { r = {
'CONTENT_LENGTH': len(payload), 'CONTENT_LENGTH': len(payload),
@ -109,7 +109,7 @@ class FileUploadTests(TestCase):
for i, name in enumerate(scary_file_names): for i, name in enumerate(scary_file_names):
got = recieved["file%s" % i] got = recieved["file%s" % i]
self.assertEqual(got, "hax0rd.txt") self.assertEqual(got, "hax0rd.txt")
def test_filename_overflow(self): def test_filename_overflow(self):
"""File names over 256 characters (dangerous on some platforms) get fixed up.""" """File names over 256 characters (dangerous on some platforms) get fixed up."""
name = "%s.txt" % ("f"*500) name = "%s.txt" % ("f"*500)
@ -131,26 +131,26 @@ class FileUploadTests(TestCase):
} }
got = simplejson.loads(self.client.request(**r).content) got = simplejson.loads(self.client.request(**r).content)
self.assert_(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file'])) self.assert_(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
def test_custom_upload_handler(self): def test_custom_upload_handler(self):
# A small file (under the 5M quota) # A small file (under the 5M quota)
smallfile = tempfile.NamedTemporaryFile() smallfile = tempfile.NamedTemporaryFile()
smallfile.write('a' * (2 ** 21)) smallfile.write('a' * (2 ** 21))
# A big file (over the quota) # A big file (over the quota)
bigfile = tempfile.NamedTemporaryFile() bigfile = tempfile.NamedTemporaryFile()
bigfile.write('a' * (10 * 2 ** 20)) bigfile.write('a' * (10 * 2 ** 20))
# Small file posting should work. # Small file posting should work.
response = self.client.post('/file_uploads/quota/', {'f': open(smallfile.name)}) response = self.client.post('/file_uploads/quota/', {'f': open(smallfile.name)})
got = simplejson.loads(response.content) got = simplejson.loads(response.content)
self.assert_('f' in got) self.assert_('f' in got)
# Large files don't go through. # Large files don't go through.
response = self.client.post("/file_uploads/quota/", {'f': open(bigfile.name)}) response = self.client.post("/file_uploads/quota/", {'f': open(bigfile.name)})
got = simplejson.loads(response.content) got = simplejson.loads(response.content)
self.assert_('f' not in got) self.assert_('f' not in got)
def test_broken_custom_upload_handler(self): def test_broken_custom_upload_handler(self):
f = tempfile.NamedTemporaryFile() f = tempfile.NamedTemporaryFile()
f.write('a' * (2 ** 21)) f.write('a' * (2 ** 21))
@ -189,7 +189,7 @@ class FileUploadTests(TestCase):
class DirectoryCreationTests(unittest.TestCase): class DirectoryCreationTests(unittest.TestCase):
""" """
Tests for error handling during directory creation Tests for error handling during directory creation
via _save_FIELD_file (ticket #6450) via _save_FIELD_file (ticket #6450)
""" """
def setUp(self): def setUp(self):
@ -221,7 +221,7 @@ class DirectoryCreationTests(unittest.TestCase):
except IOError, err: except IOError, err:
# The test needs to be done on a specific string as IOError # The test needs to be done on a specific string as IOError
# is raised even without the patch (just not early enough) # is raised even without the patch (just not early enough)
self.assertEquals(err.args[0], self.assertEquals(err.args[0],
"%s exists and is not a directory" % UPLOAD_TO) "%s exists and is not a directory" % UPLOAD_TO)
except: except:
self.fail("IOError not raised") self.fail("IOError not raised")