144 lines
5.4 KiB
Python
144 lines
5.4 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import pickle
|
|
import datetime
|
|
import warnings
|
|
|
|
from django.test import TestCase
|
|
from django.utils.encoding import force_text
|
|
from django.utils.version import get_major_version, get_version
|
|
|
|
from .models import Group, Event, Happening, Container, M2MModel
|
|
|
|
|
|
class PickleabilityTestCase(TestCase):
|
|
def setUp(self):
|
|
Happening.objects.create() # make sure the defaults are working (#20158)
|
|
|
|
def assert_pickles(self, qs):
|
|
self.assertEqual(list(pickle.loads(pickle.dumps(qs))), list(qs))
|
|
|
|
def test_related_field(self):
|
|
g = Group.objects.create(name="Ponies Who Own Maybachs")
|
|
self.assert_pickles(Event.objects.filter(group=g.id))
|
|
|
|
def test_datetime_callable_default_all(self):
|
|
self.assert_pickles(Happening.objects.all())
|
|
|
|
def test_datetime_callable_default_filter(self):
|
|
self.assert_pickles(Happening.objects.filter(when=datetime.datetime.now()))
|
|
|
|
def test_lambda_as_default(self):
|
|
self.assert_pickles(Happening.objects.filter(name="test"))
|
|
|
|
def test_standalone_method_as_default(self):
|
|
self.assert_pickles(Happening.objects.filter(number1=1))
|
|
|
|
def test_staticmethod_as_default(self):
|
|
self.assert_pickles(Happening.objects.filter(number2=1))
|
|
|
|
def test_classmethod_as_default(self):
|
|
self.assert_pickles(Happening.objects.filter(number3=1))
|
|
|
|
def test_membermethod_as_default(self):
|
|
self.assert_pickles(Happening.objects.filter(number4=1))
|
|
|
|
def test_doesnotexist_exception(self):
|
|
# Ticket #17776
|
|
original = Event.DoesNotExist("Doesn't exist")
|
|
unpickled = pickle.loads(pickle.dumps(original))
|
|
|
|
# Exceptions are not equal to equivalent instances of themselves, so
|
|
# can't just use assertEqual(original, unpickled)
|
|
self.assertEqual(original.__class__, unpickled.__class__)
|
|
self.assertEqual(original.args, unpickled.args)
|
|
|
|
def test_manager_pickle(self):
|
|
pickle.loads(pickle.dumps(Happening.objects))
|
|
|
|
def test_model_pickle(self):
|
|
"""
|
|
Test that a model not defined on module level is pickleable.
|
|
"""
|
|
original = Container.SomeModel(pk=1)
|
|
dumped = pickle.dumps(original)
|
|
reloaded = pickle.loads(dumped)
|
|
self.assertEqual(original, reloaded)
|
|
# Also, deferred dynamic model works
|
|
Container.SomeModel.objects.create(somefield=1)
|
|
original = Container.SomeModel.objects.defer('somefield')[0]
|
|
dumped = pickle.dumps(original)
|
|
reloaded = pickle.loads(dumped)
|
|
self.assertEqual(original, reloaded)
|
|
self.assertEqual(original.somefield, reloaded.somefield)
|
|
|
|
def test_model_pickle_m2m(self):
|
|
"""
|
|
Test intentionally the automatically created through model.
|
|
"""
|
|
m1 = M2MModel.objects.create()
|
|
g1 = Group.objects.create(name='foof')
|
|
m1.groups.add(g1)
|
|
m2m_through = M2MModel._meta.get_field('groups').rel.through
|
|
original = m2m_through.objects.get()
|
|
dumped = pickle.dumps(original)
|
|
reloaded = pickle.loads(dumped)
|
|
self.assertEqual(original, reloaded)
|
|
|
|
def test_model_pickle_dynamic(self):
|
|
class Meta:
|
|
proxy = True
|
|
dynclass = type(str("DynamicEventSubclass"), (Event, ),
|
|
{'Meta': Meta, '__module__': Event.__module__})
|
|
original = dynclass(pk=1)
|
|
dumped = pickle.dumps(original)
|
|
reloaded = pickle.loads(dumped)
|
|
self.assertEqual(original, reloaded)
|
|
self.assertIs(reloaded.__class__, dynclass)
|
|
|
|
def test_specialized_queryset(self):
|
|
self.assert_pickles(Happening.objects.values('name'))
|
|
self.assert_pickles(Happening.objects.values('name').dates('when', 'year'))
|
|
# With related field (#14515)
|
|
self.assert_pickles(
|
|
Event.objects.select_related('group').order_by('title').values_list('title', 'group__name')
|
|
)
|
|
|
|
def test_pickle_prefetch_related_idempotence(self):
|
|
g = Group.objects.create(name='foo')
|
|
groups = Group.objects.prefetch_related('event_set')
|
|
|
|
# First pickling
|
|
groups = pickle.loads(pickle.dumps(groups))
|
|
self.assertQuerysetEqual(groups, [g], lambda x: x)
|
|
|
|
# Second pickling
|
|
groups = pickle.loads(pickle.dumps(groups))
|
|
self.assertQuerysetEqual(groups, [g], lambda x: x)
|
|
|
|
def test_missing_django_version_unpickling(self):
|
|
"""
|
|
#21430 -- Verifies a warning is raised for querysets that are
|
|
unpickled without a Django version
|
|
"""
|
|
qs = Group.missing_django_version_objects.all()
|
|
with warnings.catch_warnings(record=True) as recorded:
|
|
pickle.loads(pickle.dumps(qs))
|
|
msg = force_text(recorded.pop().message)
|
|
self.assertEqual(msg,
|
|
"Pickled queryset instance's Django version is not specified.")
|
|
|
|
def test_unsupported_unpickle(self):
|
|
"""
|
|
#21430 -- Verifies a warning is raised for querysets that are
|
|
unpickled with a different Django version than the current
|
|
"""
|
|
qs = Group.previous_django_version_objects.all()
|
|
with warnings.catch_warnings(record=True) as recorded:
|
|
pickle.loads(pickle.dumps(qs))
|
|
msg = force_text(recorded.pop().message)
|
|
self.assertEqual(msg,
|
|
"Pickled queryset instance's Django version %s does not "
|
|
"match the current version %s."
|
|
% (str(float(get_major_version()) - 0.1), get_version()))
|