Fixed #8817: get_image_dimensions correctly closes the files it opens, and leaves open the ones it doesn't. Thanks, mitsuhiko.
While I was at it, I converted the file_storage doctests to unit tests. git-svn-id: http://code.djangoproject.com/svn/django/trunk@10707 bcc190cf-cafb-0310-a4f2-bffc1f526a37
This commit is contained in:
parent
614d881450
commit
a7faf6424a
|
@ -28,15 +28,21 @@ def get_image_dimensions(file_or_path):
|
||||||
"""Returns the (width, height) of an image, given an open file or a path."""
|
"""Returns the (width, height) of an image, given an open file or a path."""
|
||||||
from PIL import ImageFile as PILImageFile
|
from PIL import ImageFile as PILImageFile
|
||||||
p = PILImageFile.Parser()
|
p = PILImageFile.Parser()
|
||||||
|
close = False
|
||||||
if hasattr(file_or_path, 'read'):
|
if hasattr(file_or_path, 'read'):
|
||||||
file = file_or_path
|
file = file_or_path
|
||||||
else:
|
else:
|
||||||
file = open(file_or_path, 'rb')
|
file = open(file_or_path, 'rb')
|
||||||
while 1:
|
close = True
|
||||||
data = file.read(1024)
|
try:
|
||||||
if not data:
|
while 1:
|
||||||
break
|
data = file.read(1024)
|
||||||
p.feed(data)
|
if not data:
|
||||||
if p.image:
|
break
|
||||||
return p.image.size
|
p.feed(data)
|
||||||
return None
|
if p.image:
|
||||||
|
return p.image.size
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
if close:
|
||||||
|
file.close()
|
||||||
|
|
|
@ -1,105 +1,105 @@
|
||||||
# coding: utf-8
|
# -*- coding: utf-8 -*-
|
||||||
"""
|
|
||||||
Tests for the file storage mechanism
|
|
||||||
|
|
||||||
>>> import tempfile
|
|
||||||
>>> from django.core.files.storage import FileSystemStorage
|
|
||||||
>>> from django.core.files.base import ContentFile
|
|
||||||
|
|
||||||
# Set up a unique temporary directory
|
|
||||||
>>> import os
|
|
||||||
>>> temp_dir = tempfile.mktemp()
|
|
||||||
>>> os.makedirs(temp_dir)
|
|
||||||
|
|
||||||
>>> temp_storage = FileSystemStorage(location=temp_dir)
|
|
||||||
|
|
||||||
# Standard file access options are available, and work as expected.
|
|
||||||
|
|
||||||
>>> temp_storage.exists('storage_test')
|
|
||||||
False
|
|
||||||
>>> file = temp_storage.open('storage_test', 'w')
|
|
||||||
>>> file.write('storage contents')
|
|
||||||
>>> file.close()
|
|
||||||
|
|
||||||
>>> temp_storage.exists('storage_test')
|
|
||||||
True
|
|
||||||
>>> file = temp_storage.open('storage_test', 'r')
|
|
||||||
>>> file.read()
|
|
||||||
'storage contents'
|
|
||||||
>>> file.close()
|
|
||||||
|
|
||||||
>>> temp_storage.delete('storage_test')
|
|
||||||
>>> temp_storage.exists('storage_test')
|
|
||||||
False
|
|
||||||
|
|
||||||
# Files can only be accessed if they're below the specified location.
|
|
||||||
|
|
||||||
>>> temp_storage.exists('..')
|
|
||||||
Traceback (most recent call last):
|
|
||||||
...
|
|
||||||
SuspiciousOperation: Attempted access to '..' denied.
|
|
||||||
>>> temp_storage.open('/etc/passwd')
|
|
||||||
Traceback (most recent call last):
|
|
||||||
...
|
|
||||||
SuspiciousOperation: Attempted access to '/etc/passwd' denied.
|
|
||||||
|
|
||||||
# Custom storage systems can be created to customize behavior
|
|
||||||
|
|
||||||
>>> class CustomStorage(FileSystemStorage):
|
|
||||||
... def get_available_name(self, name):
|
|
||||||
... # Append numbers to duplicate files rather than underscores, like Trac
|
|
||||||
...
|
|
||||||
... parts = name.split('.')
|
|
||||||
... basename, ext = parts[0], parts[1:]
|
|
||||||
... number = 2
|
|
||||||
...
|
|
||||||
... while self.exists(name):
|
|
||||||
... name = '.'.join([basename, str(number)] + ext)
|
|
||||||
... number += 1
|
|
||||||
...
|
|
||||||
... return name
|
|
||||||
>>> custom_storage = CustomStorage(temp_dir)
|
|
||||||
|
|
||||||
>>> first = custom_storage.save('custom_storage', ContentFile('custom contents'))
|
|
||||||
>>> first
|
|
||||||
u'custom_storage'
|
|
||||||
>>> second = custom_storage.save('custom_storage', ContentFile('more contents'))
|
|
||||||
>>> second
|
|
||||||
u'custom_storage.2'
|
|
||||||
|
|
||||||
>>> custom_storage.delete(first)
|
|
||||||
>>> custom_storage.delete(second)
|
|
||||||
|
|
||||||
# Cleanup the temp dir
|
|
||||||
>>> os.rmdir(temp_dir)
|
|
||||||
|
|
||||||
|
|
||||||
# Regression test for #8156: files with unicode names I can't quite figure out the
|
|
||||||
# encoding situation between doctest and this file, but the actual repr doesn't
|
|
||||||
# matter; it just shouldn't return a unicode object.
|
|
||||||
>>> from django.core.files.uploadedfile import UploadedFile
|
|
||||||
>>> uf = UploadedFile(name=u'¿Cómo?',content_type='text')
|
|
||||||
>>> uf.__repr__()
|
|
||||||
'<UploadedFile: ... (text)>'
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Tests for a race condition on file saving (#4948).
|
|
||||||
# This is written in such a way that it'll always pass on platforms
|
|
||||||
# without threading.
|
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
from unittest import TestCase
|
import time
|
||||||
|
import unittest
|
||||||
|
from cStringIO import StringIO
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.core.exceptions import SuspiciousOperation
|
||||||
from django.core.files.base import ContentFile
|
from django.core.files.base import ContentFile
|
||||||
|
from django.core.files.images import get_image_dimensions
|
||||||
from django.core.files.storage import FileSystemStorage
|
from django.core.files.storage import FileSystemStorage
|
||||||
|
from django.core.files.uploadedfile import UploadedFile
|
||||||
|
from unittest import TestCase
|
||||||
try:
|
try:
|
||||||
import threading
|
import threading
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import dummy_threading as threading
|
import dummy_threading as threading
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Checking for the existence of Image is enough for CPython, but
|
||||||
|
# for PyPy, you need to check for the underlying modules
|
||||||
|
from PIL import Image, _imaging
|
||||||
|
except ImportError:
|
||||||
|
Image = None
|
||||||
|
|
||||||
|
class FileStorageTests(unittest.TestCase):
|
||||||
|
storage_class = FileSystemStorage
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.temp_dir = tempfile.mktemp()
|
||||||
|
os.makedirs(self.temp_dir)
|
||||||
|
self.storage = self.storage_class(location=self.temp_dir)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
os.rmdir(self.temp_dir)
|
||||||
|
|
||||||
|
def test_file_access_options(self):
|
||||||
|
"""
|
||||||
|
Standard file access options are available, and work as expected.
|
||||||
|
"""
|
||||||
|
self.failIf(self.storage.exists('storage_test'))
|
||||||
|
f = self.storage.open('storage_test', 'w')
|
||||||
|
f.write('storage contents')
|
||||||
|
f.close()
|
||||||
|
self.assert_(self.storage.exists('storage_test'))
|
||||||
|
|
||||||
|
f = self.storage.open('storage_test', 'r')
|
||||||
|
self.assertEqual(f.read(), 'storage contents')
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
self.storage.delete('storage_test')
|
||||||
|
self.failIf(self.storage.exists('storage_test'))
|
||||||
|
|
||||||
|
def test_file_storage_prevents_directory_traversal(self):
|
||||||
|
"""
|
||||||
|
File storage prevents directory traversal (files can only be accessed if
|
||||||
|
they're below the storage location).
|
||||||
|
"""
|
||||||
|
self.assertRaises(SuspiciousOperation, self.storage.exists, '..')
|
||||||
|
self.assertRaises(SuspiciousOperation, self.storage.exists, '/etc/passwd')
|
||||||
|
|
||||||
|
class CustomStorage(FileSystemStorage):
|
||||||
|
def get_available_name(self, name):
|
||||||
|
"""
|
||||||
|
Append numbers to duplicate files rather than underscores, like Trac.
|
||||||
|
"""
|
||||||
|
parts = name.split('.')
|
||||||
|
basename, ext = parts[0], parts[1:]
|
||||||
|
number = 2
|
||||||
|
while self.exists(name):
|
||||||
|
name = '.'.join([basename, str(number)] + ext)
|
||||||
|
number += 1
|
||||||
|
|
||||||
|
return name
|
||||||
|
|
||||||
|
class CustomStorageTests(FileStorageTests):
|
||||||
|
storage_class = CustomStorage
|
||||||
|
|
||||||
|
def test_custom_get_available_name(self):
|
||||||
|
first = self.storage.save('custom_storage', ContentFile('custom contents'))
|
||||||
|
self.assertEqual(first, 'custom_storage')
|
||||||
|
second = self.storage.save('custom_storage', ContentFile('more contents'))
|
||||||
|
self.assertEqual(second, 'custom_storage.2')
|
||||||
|
self.storage.delete(first)
|
||||||
|
self.storage.delete(second)
|
||||||
|
|
||||||
|
class UnicodeFileNameTests(unittest.TestCase):
|
||||||
|
def test_unicode_file_names(self):
|
||||||
|
"""
|
||||||
|
Regression test for #8156: files with unicode names I can't quite figure
|
||||||
|
out the encoding situation between doctest and this file, but the actual
|
||||||
|
repr doesn't matter; it just shouldn't return a unicode object.
|
||||||
|
"""
|
||||||
|
uf = UploadedFile(name=u'¿Cómo?',content_type='text')
|
||||||
|
self.assertEqual(type(uf.__repr__()), str)
|
||||||
|
|
||||||
|
# Tests for a race condition on file saving (#4948).
|
||||||
|
# This is written in such a way that it'll always pass on platforms
|
||||||
|
# without threading.
|
||||||
|
|
||||||
class SlowFile(ContentFile):
|
class SlowFile(ContentFile):
|
||||||
def chunks(self):
|
def chunks(self):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
@ -180,3 +180,49 @@ class FileStoragePathParsing(TestCase):
|
||||||
self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/_.test')))
|
self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/_.test')))
|
||||||
else:
|
else:
|
||||||
self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/.test_')))
|
self.assertTrue(os.path.exists(os.path.join(self.storage_dir, 'dotted.path/.test_')))
|
||||||
|
|
||||||
|
if Image is not None:
|
||||||
|
class DimensionClosingBug(TestCase):
|
||||||
|
"""
|
||||||
|
Test that get_image_dimensions() properly closes files (#8817)
|
||||||
|
"""
|
||||||
|
def test_not_closing_of_files(self):
|
||||||
|
"""
|
||||||
|
Open files passed into get_image_dimensions() should stay opened.
|
||||||
|
"""
|
||||||
|
empty_io = StringIO()
|
||||||
|
try:
|
||||||
|
get_image_dimensions(empty_io)
|
||||||
|
finally:
|
||||||
|
self.assert_(not empty_io.closed)
|
||||||
|
|
||||||
|
def test_closing_of_filenames(self):
|
||||||
|
"""
|
||||||
|
get_image_dimensions() called with a filename should closed the file.
|
||||||
|
"""
|
||||||
|
# We need to inject a modified open() builtin into the images module
|
||||||
|
# that checks if the file was closed properly if the function is
|
||||||
|
# called with a filename instead of an file object.
|
||||||
|
# get_image_dimensions will call our catching_open instead of the
|
||||||
|
# regular builtin one.
|
||||||
|
|
||||||
|
class FileWrapper(object):
|
||||||
|
_closed = []
|
||||||
|
def __init__(self, f):
|
||||||
|
self.f = f
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return getattr(self.f, name)
|
||||||
|
def close(self):
|
||||||
|
self._closed.append(True)
|
||||||
|
self.f.close()
|
||||||
|
|
||||||
|
def catching_open(*args):
|
||||||
|
return FileWrapper(open(*args))
|
||||||
|
|
||||||
|
from django.core.files import images
|
||||||
|
images.open = catching_open
|
||||||
|
try:
|
||||||
|
get_image_dimensions(os.path.join(os.path.dirname(__file__), "test1.png"))
|
||||||
|
finally:
|
||||||
|
del images.open
|
||||||
|
self.assert_(FileWrapper._closed)
|
||||||
|
|
Loading…
Reference in New Issue