600 lines
22 KiB
Python
600 lines
22 KiB
Python
import base64
|
|
from datetime import timedelta
|
|
import os
|
|
import shutil
|
|
import string
|
|
import tempfile
|
|
import unittest
|
|
import warnings
|
|
|
|
from django.conf import settings
|
|
from django.contrib.sessions.backends.db import SessionStore as DatabaseSession
|
|
from django.contrib.sessions.backends.cache import SessionStore as CacheSession
|
|
from django.contrib.sessions.backends.cached_db import SessionStore as CacheDBSession
|
|
from django.contrib.sessions.backends.file import SessionStore as FileSession
|
|
from django.contrib.sessions.backends.signed_cookies import SessionStore as CookieSession
|
|
from django.contrib.sessions.models import Session
|
|
from django.contrib.sessions.middleware import SessionMiddleware
|
|
from django.core.cache import get_cache
|
|
from django.core.cache.backends.base import InvalidCacheBackendError
|
|
from django.core import management
|
|
from django.core.exceptions import ImproperlyConfigured
|
|
from django.http import HttpResponse
|
|
from django.test import TestCase, RequestFactory
|
|
from django.test.utils import override_settings, patch_logger
|
|
from django.utils import six
|
|
from django.utils import timezone
|
|
|
|
from django.contrib.sessions.exceptions import InvalidSessionKey
|
|
|
|
|
|
class SessionTestsMixin(object):
|
|
# This does not inherit from TestCase to avoid any tests being run with this
|
|
# class, which wouldn't work, and to allow different TestCase subclasses to
|
|
# be used.
|
|
|
|
backend = None # subclasses must specify
|
|
|
|
def setUp(self):
|
|
self.session = self.backend()
|
|
|
|
def tearDown(self):
|
|
# NB: be careful to delete any sessions created; stale sessions fill up
|
|
# the /tmp (with some backends) and eventually overwhelm it after lots
|
|
# of runs (think buildbots)
|
|
self.session.delete()
|
|
|
|
def test_new_session(self):
|
|
self.assertFalse(self.session.modified)
|
|
self.assertFalse(self.session.accessed)
|
|
|
|
def test_get_empty(self):
|
|
self.assertEqual(self.session.get('cat'), None)
|
|
|
|
def test_store(self):
|
|
self.session['cat'] = "dog"
|
|
self.assertTrue(self.session.modified)
|
|
self.assertEqual(self.session.pop('cat'), 'dog')
|
|
|
|
def test_pop(self):
|
|
self.session['some key'] = 'exists'
|
|
# Need to reset these to pretend we haven't accessed it:
|
|
self.accessed = False
|
|
self.modified = False
|
|
|
|
self.assertEqual(self.session.pop('some key'), 'exists')
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertTrue(self.session.modified)
|
|
self.assertEqual(self.session.get('some key'), None)
|
|
|
|
def test_pop_default(self):
|
|
self.assertEqual(self.session.pop('some key', 'does not exist'),
|
|
'does not exist')
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertFalse(self.session.modified)
|
|
|
|
def test_setdefault(self):
|
|
self.assertEqual(self.session.setdefault('foo', 'bar'), 'bar')
|
|
self.assertEqual(self.session.setdefault('foo', 'baz'), 'bar')
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertTrue(self.session.modified)
|
|
|
|
def test_update(self):
|
|
self.session.update({'update key': 1})
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertTrue(self.session.modified)
|
|
self.assertEqual(self.session.get('update key', None), 1)
|
|
|
|
def test_has_key(self):
|
|
self.session['some key'] = 1
|
|
self.session.modified = False
|
|
self.session.accessed = False
|
|
self.assertIn('some key', self.session)
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertFalse(self.session.modified)
|
|
|
|
def test_values(self):
|
|
self.assertEqual(list(self.session.values()), [])
|
|
self.assertTrue(self.session.accessed)
|
|
self.session['some key'] = 1
|
|
self.assertEqual(list(self.session.values()), [1])
|
|
|
|
def test_iterkeys(self):
|
|
self.session['x'] = 1
|
|
self.session.modified = False
|
|
self.session.accessed = False
|
|
i = six.iterkeys(self.session)
|
|
self.assertTrue(hasattr(i, '__iter__'))
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertFalse(self.session.modified)
|
|
self.assertEqual(list(i), ['x'])
|
|
|
|
def test_itervalues(self):
|
|
self.session['x'] = 1
|
|
self.session.modified = False
|
|
self.session.accessed = False
|
|
i = six.itervalues(self.session)
|
|
self.assertTrue(hasattr(i, '__iter__'))
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertFalse(self.session.modified)
|
|
self.assertEqual(list(i), [1])
|
|
|
|
def test_iteritems(self):
|
|
self.session['x'] = 1
|
|
self.session.modified = False
|
|
self.session.accessed = False
|
|
i = six.iteritems(self.session)
|
|
self.assertTrue(hasattr(i, '__iter__'))
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertFalse(self.session.modified)
|
|
self.assertEqual(list(i), [('x', 1)])
|
|
|
|
def test_clear(self):
|
|
self.session['x'] = 1
|
|
self.session.modified = False
|
|
self.session.accessed = False
|
|
self.assertEqual(list(self.session.items()), [('x', 1)])
|
|
self.session.clear()
|
|
self.assertEqual(list(self.session.items()), [])
|
|
self.assertTrue(self.session.accessed)
|
|
self.assertTrue(self.session.modified)
|
|
|
|
def test_save(self):
|
|
if (hasattr(self.session, '_cache') and'DummyCache' in
|
|
settings.CACHES[settings.SESSION_CACHE_ALIAS]['BACKEND']):
|
|
raise unittest.SkipTest("Session saving tests require a real cache backend")
|
|
self.session.save()
|
|
self.assertTrue(self.session.exists(self.session.session_key))
|
|
|
|
def test_delete(self):
|
|
self.session.save()
|
|
self.session.delete(self.session.session_key)
|
|
self.assertFalse(self.session.exists(self.session.session_key))
|
|
|
|
def test_flush(self):
|
|
self.session['foo'] = 'bar'
|
|
self.session.save()
|
|
prev_key = self.session.session_key
|
|
self.session.flush()
|
|
self.assertFalse(self.session.exists(prev_key))
|
|
self.assertNotEqual(self.session.session_key, prev_key)
|
|
self.assertTrue(self.session.modified)
|
|
self.assertTrue(self.session.accessed)
|
|
|
|
def test_cycle(self):
|
|
self.session['a'], self.session['b'] = 'c', 'd'
|
|
self.session.save()
|
|
prev_key = self.session.session_key
|
|
prev_data = list(self.session.items())
|
|
self.session.cycle_key()
|
|
self.assertNotEqual(self.session.session_key, prev_key)
|
|
self.assertEqual(list(self.session.items()), prev_data)
|
|
|
|
def test_invalid_key(self):
|
|
# Submitting an invalid session key (either by guessing, or if the db has
|
|
# removed the key) results in a new key being generated.
|
|
try:
|
|
session = self.backend('1')
|
|
try:
|
|
session.save()
|
|
except AttributeError:
|
|
self.fail("The session object did not save properly. Middleware may be saving cache items without namespaces.")
|
|
self.assertNotEqual(session.session_key, '1')
|
|
self.assertEqual(session.get('cat'), None)
|
|
session.delete()
|
|
finally:
|
|
# Some backends leave a stale cache entry for the invalid
|
|
# session key; make sure that entry is manually deleted
|
|
session.delete('1')
|
|
|
|
def test_session_key_is_read_only(self):
|
|
def set_session_key(session):
|
|
session.session_key = session._get_new_session_key()
|
|
self.assertRaises(AttributeError, set_session_key, self.session)
|
|
|
|
# Custom session expiry
|
|
def test_default_expiry(self):
|
|
# A normal session has a max age equal to settings
|
|
self.assertEqual(self.session.get_expiry_age(), settings.SESSION_COOKIE_AGE)
|
|
|
|
# So does a custom session with an idle expiration time of 0 (but it'll
|
|
# expire at browser close)
|
|
self.session.set_expiry(0)
|
|
self.assertEqual(self.session.get_expiry_age(), settings.SESSION_COOKIE_AGE)
|
|
|
|
def test_custom_expiry_seconds(self):
|
|
modification = timezone.now()
|
|
|
|
self.session.set_expiry(10)
|
|
|
|
date = self.session.get_expiry_date(modification=modification)
|
|
self.assertEqual(date, modification + timedelta(seconds=10))
|
|
|
|
age = self.session.get_expiry_age(modification=modification)
|
|
self.assertEqual(age, 10)
|
|
|
|
def test_custom_expiry_timedelta(self):
|
|
modification = timezone.now()
|
|
|
|
# Mock timezone.now, because set_expiry calls it on this code path.
|
|
original_now = timezone.now
|
|
try:
|
|
timezone.now = lambda: modification
|
|
self.session.set_expiry(timedelta(seconds=10))
|
|
finally:
|
|
timezone.now = original_now
|
|
|
|
date = self.session.get_expiry_date(modification=modification)
|
|
self.assertEqual(date, modification + timedelta(seconds=10))
|
|
|
|
age = self.session.get_expiry_age(modification=modification)
|
|
self.assertEqual(age, 10)
|
|
|
|
def test_custom_expiry_datetime(self):
|
|
modification = timezone.now()
|
|
|
|
self.session.set_expiry(modification + timedelta(seconds=10))
|
|
|
|
date = self.session.get_expiry_date(modification=modification)
|
|
self.assertEqual(date, modification + timedelta(seconds=10))
|
|
|
|
age = self.session.get_expiry_age(modification=modification)
|
|
self.assertEqual(age, 10)
|
|
|
|
def test_custom_expiry_reset(self):
|
|
self.session.set_expiry(None)
|
|
self.session.set_expiry(10)
|
|
self.session.set_expiry(None)
|
|
self.assertEqual(self.session.get_expiry_age(), settings.SESSION_COOKIE_AGE)
|
|
|
|
def test_get_expire_at_browser_close(self):
|
|
# Tests get_expire_at_browser_close with different settings and different
|
|
# set_expiry calls
|
|
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=False):
|
|
self.session.set_expiry(10)
|
|
self.assertFalse(self.session.get_expire_at_browser_close())
|
|
|
|
self.session.set_expiry(0)
|
|
self.assertTrue(self.session.get_expire_at_browser_close())
|
|
|
|
self.session.set_expiry(None)
|
|
self.assertFalse(self.session.get_expire_at_browser_close())
|
|
|
|
with override_settings(SESSION_EXPIRE_AT_BROWSER_CLOSE=True):
|
|
self.session.set_expiry(10)
|
|
self.assertFalse(self.session.get_expire_at_browser_close())
|
|
|
|
self.session.set_expiry(0)
|
|
self.assertTrue(self.session.get_expire_at_browser_close())
|
|
|
|
self.session.set_expiry(None)
|
|
self.assertTrue(self.session.get_expire_at_browser_close())
|
|
|
|
def test_decode(self):
|
|
# Ensure we can decode what we encode
|
|
data = {'a test key': 'a test value'}
|
|
encoded = self.session.encode(data)
|
|
self.assertEqual(self.session.decode(encoded), data)
|
|
|
|
def test_decode_failure_logged_to_security(self):
|
|
bad_encode = base64.b64encode(b'flaskdj:alkdjf')
|
|
with patch_logger('django.security.SuspiciousSession', 'warning') as calls:
|
|
self.assertEqual({}, self.session.decode(bad_encode))
|
|
# check that the failed decode is logged
|
|
self.assertEqual(len(calls), 1)
|
|
self.assertTrue('corrupted' in calls[0])
|
|
|
|
|
|
def test_actual_expiry(self):
|
|
# this doesn't work with JSONSerializer (serializing timedelta)
|
|
with override_settings(SESSION_SERIALIZER='django.contrib.sessions.serializers.PickleSerializer'):
|
|
self.session = self.backend() # reinitialize after overriding settings
|
|
|
|
# Regression test for #19200
|
|
old_session_key = None
|
|
new_session_key = None
|
|
try:
|
|
self.session['foo'] = 'bar'
|
|
self.session.set_expiry(-timedelta(seconds=10))
|
|
self.session.save()
|
|
old_session_key = self.session.session_key
|
|
# With an expiry date in the past, the session expires instantly.
|
|
new_session = self.backend(self.session.session_key)
|
|
new_session_key = new_session.session_key
|
|
self.assertNotIn('foo', new_session)
|
|
finally:
|
|
self.session.delete(old_session_key)
|
|
self.session.delete(new_session_key)
|
|
|
|
|
|
class DatabaseSessionTests(SessionTestsMixin, TestCase):
|
|
|
|
backend = DatabaseSession
|
|
|
|
def test_session_get_decoded(self):
|
|
"""
|
|
Test we can use Session.get_decoded to retrieve data stored
|
|
in normal way
|
|
"""
|
|
self.session['x'] = 1
|
|
self.session.save()
|
|
|
|
s = Session.objects.get(session_key=self.session.session_key)
|
|
|
|
self.assertEqual(s.get_decoded(), {'x': 1})
|
|
|
|
def test_sessionmanager_save(self):
|
|
"""
|
|
Test SessionManager.save method
|
|
"""
|
|
# Create a session
|
|
self.session['y'] = 1
|
|
self.session.save()
|
|
|
|
s = Session.objects.get(session_key=self.session.session_key)
|
|
# Change it
|
|
Session.objects.save(s.session_key, {'y': 2}, s.expire_date)
|
|
# Clear cache, so that it will be retrieved from DB
|
|
del self.session._session_cache
|
|
self.assertEqual(self.session['y'], 2)
|
|
|
|
@override_settings(SESSION_ENGINE="django.contrib.sessions.backends.db")
|
|
def test_clearsessions_command(self):
|
|
"""
|
|
Test clearsessions command for clearing expired sessions.
|
|
"""
|
|
self.assertEqual(0, Session.objects.count())
|
|
|
|
# One object in the future
|
|
self.session['foo'] = 'bar'
|
|
self.session.set_expiry(3600)
|
|
self.session.save()
|
|
|
|
# One object in the past
|
|
other_session = self.backend()
|
|
other_session['foo'] = 'bar'
|
|
other_session.set_expiry(-3600)
|
|
other_session.save()
|
|
|
|
# Two sessions are in the database before clearsessions...
|
|
self.assertEqual(2, Session.objects.count())
|
|
management.call_command('clearsessions')
|
|
# ... and one is deleted.
|
|
self.assertEqual(1, Session.objects.count())
|
|
|
|
|
|
@override_settings(USE_TZ=True)
|
|
class DatabaseSessionWithTimeZoneTests(DatabaseSessionTests):
|
|
pass
|
|
|
|
|
|
class CacheDBSessionTests(SessionTestsMixin, TestCase):
|
|
|
|
backend = CacheDBSession
|
|
|
|
@unittest.skipIf('DummyCache' in
|
|
settings.CACHES[settings.SESSION_CACHE_ALIAS]['BACKEND'],
|
|
"Session saving tests require a real cache backend")
|
|
def test_exists_searches_cache_first(self):
|
|
self.session.save()
|
|
with self.assertNumQueries(0):
|
|
self.assertTrue(self.session.exists(self.session.session_key))
|
|
|
|
def test_load_overlong_key(self):
|
|
# Some backends might issue a warning
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore")
|
|
self.session._session_key = (string.ascii_letters + string.digits) * 20
|
|
self.assertEqual(self.session.load(), {})
|
|
|
|
@override_settings(SESSION_CACHE_ALIAS='sessions')
|
|
def test_non_default_cache(self):
|
|
#21000 - CacheDB backend should respect SESSION_CACHE_ALIAS.
|
|
self.assertRaises(InvalidCacheBackendError, self.backend)
|
|
|
|
|
|
@override_settings(USE_TZ=True)
|
|
class CacheDBSessionWithTimeZoneTests(CacheDBSessionTests):
|
|
pass
|
|
|
|
|
|
# Don't need DB flushing for these tests, so can use unittest.TestCase as base class
|
|
class FileSessionTests(SessionTestsMixin, unittest.TestCase):
|
|
|
|
backend = FileSession
|
|
|
|
def setUp(self):
|
|
# Do file session tests in an isolated directory, and kill it after we're done.
|
|
self.original_session_file_path = settings.SESSION_FILE_PATH
|
|
self.temp_session_store = settings.SESSION_FILE_PATH = tempfile.mkdtemp()
|
|
# Reset the file session backend's internal caches
|
|
if hasattr(self.backend, '_storage_path'):
|
|
del self.backend._storage_path
|
|
super(FileSessionTests, self).setUp()
|
|
|
|
def tearDown(self):
|
|
super(FileSessionTests, self).tearDown()
|
|
settings.SESSION_FILE_PATH = self.original_session_file_path
|
|
shutil.rmtree(self.temp_session_store)
|
|
|
|
@override_settings(
|
|
SESSION_FILE_PATH="/if/this/directory/exists/you/have/a/weird/computer")
|
|
def test_configuration_check(self):
|
|
del self.backend._storage_path
|
|
# Make sure the file backend checks for a good storage dir
|
|
self.assertRaises(ImproperlyConfigured, self.backend)
|
|
|
|
def test_invalid_key_backslash(self):
|
|
# This key should be refused and a new session should be created
|
|
self.assertTrue(self.backend("a\\b\\c").load())
|
|
|
|
def test_invalid_key_backslash(self):
|
|
# Ensure we don't allow directory-traversal.
|
|
# This is tested directly on _key_to_file, as load() will swallow
|
|
# a SuspiciousOperation in the same way as an IOError - by creating
|
|
# a new session, making it unclear whether the slashes were detected.
|
|
self.assertRaises(InvalidSessionKey,
|
|
self.backend()._key_to_file, "a\\b\\c")
|
|
|
|
def test_invalid_key_forwardslash(self):
|
|
# Ensure we don't allow directory-traversal
|
|
self.assertRaises(InvalidSessionKey,
|
|
self.backend()._key_to_file, "a/b/c")
|
|
|
|
@override_settings(SESSION_ENGINE="django.contrib.sessions.backends.file")
|
|
def test_clearsessions_command(self):
|
|
"""
|
|
Test clearsessions command for clearing expired sessions.
|
|
"""
|
|
storage_path = self.backend._get_storage_path()
|
|
file_prefix = settings.SESSION_COOKIE_NAME
|
|
|
|
def count_sessions():
|
|
return len([session_file for session_file in os.listdir(storage_path)
|
|
if session_file.startswith(file_prefix)])
|
|
|
|
self.assertEqual(0, count_sessions())
|
|
|
|
# One object in the future
|
|
self.session['foo'] = 'bar'
|
|
self.session.set_expiry(3600)
|
|
self.session.save()
|
|
|
|
# One object in the past
|
|
other_session = self.backend()
|
|
other_session['foo'] = 'bar'
|
|
other_session.set_expiry(-3600)
|
|
other_session.save()
|
|
|
|
# Two sessions are in the filesystem before clearsessions...
|
|
self.assertEqual(2, count_sessions())
|
|
management.call_command('clearsessions')
|
|
# ... and one is deleted.
|
|
self.assertEqual(1, count_sessions())
|
|
|
|
|
|
class CacheSessionTests(SessionTestsMixin, unittest.TestCase):
|
|
|
|
backend = CacheSession
|
|
|
|
def test_load_overlong_key(self):
|
|
# Some backends might issue a warning
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore")
|
|
self.session._session_key = (string.ascii_letters + string.digits) * 20
|
|
self.assertEqual(self.session.load(), {})
|
|
|
|
def test_default_cache(self):
|
|
self.session.save()
|
|
self.assertNotEqual(get_cache('default').get(self.session.cache_key), None)
|
|
|
|
@override_settings(CACHES={
|
|
'default': {
|
|
'BACKEND': 'django.core.cache.backends.dummy.DummyCache',
|
|
},
|
|
'sessions': {
|
|
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
|
},
|
|
}, SESSION_CACHE_ALIAS='sessions')
|
|
def test_non_default_cache(self):
|
|
# Re-initalize the session backend to make use of overridden settings.
|
|
self.session = self.backend()
|
|
|
|
self.session.save()
|
|
self.assertEqual(get_cache('default').get(self.session.cache_key), None)
|
|
self.assertNotEqual(get_cache('sessions').get(self.session.cache_key), None)
|
|
|
|
|
|
class SessionMiddlewareTests(unittest.TestCase):
|
|
|
|
@override_settings(SESSION_COOKIE_SECURE=True)
|
|
def test_secure_session_cookie(self):
|
|
request = RequestFactory().get('/')
|
|
response = HttpResponse('Session test')
|
|
middleware = SessionMiddleware()
|
|
|
|
# Simulate a request the modifies the session
|
|
middleware.process_request(request)
|
|
request.session['hello'] = 'world'
|
|
|
|
# Handle the response through the middleware
|
|
response = middleware.process_response(request, response)
|
|
self.assertTrue(
|
|
response.cookies[settings.SESSION_COOKIE_NAME]['secure'])
|
|
|
|
@override_settings(SESSION_COOKIE_HTTPONLY=True)
|
|
def test_httponly_session_cookie(self):
|
|
request = RequestFactory().get('/')
|
|
response = HttpResponse('Session test')
|
|
middleware = SessionMiddleware()
|
|
|
|
# Simulate a request the modifies the session
|
|
middleware.process_request(request)
|
|
request.session['hello'] = 'world'
|
|
|
|
# Handle the response through the middleware
|
|
response = middleware.process_response(request, response)
|
|
self.assertTrue(
|
|
response.cookies[settings.SESSION_COOKIE_NAME]['httponly'])
|
|
self.assertIn('httponly',
|
|
str(response.cookies[settings.SESSION_COOKIE_NAME]))
|
|
|
|
@override_settings(SESSION_COOKIE_HTTPONLY=False)
|
|
def test_no_httponly_session_cookie(self):
|
|
request = RequestFactory().get('/')
|
|
response = HttpResponse('Session test')
|
|
middleware = SessionMiddleware()
|
|
|
|
# Simulate a request the modifies the session
|
|
middleware.process_request(request)
|
|
request.session['hello'] = 'world'
|
|
|
|
# Handle the response through the middleware
|
|
response = middleware.process_response(request, response)
|
|
self.assertFalse(response.cookies[settings.SESSION_COOKIE_NAME]['httponly'])
|
|
|
|
self.assertNotIn('httponly',
|
|
str(response.cookies[settings.SESSION_COOKIE_NAME]))
|
|
|
|
def test_session_save_on_500(self):
|
|
request = RequestFactory().get('/')
|
|
response = HttpResponse('Horrible error')
|
|
response.status_code = 500
|
|
middleware = SessionMiddleware()
|
|
|
|
# Simulate a request the modifies the session
|
|
middleware.process_request(request)
|
|
request.session['hello'] = 'world'
|
|
|
|
# Handle the response through the middleware
|
|
response = middleware.process_response(request, response)
|
|
|
|
# Check that the value wasn't saved above.
|
|
self.assertNotIn('hello', request.session.load())
|
|
|
|
|
|
class CookieSessionTests(SessionTestsMixin, TestCase):
|
|
|
|
backend = CookieSession
|
|
|
|
def test_save(self):
|
|
"""
|
|
This test tested exists() in the other session backends, but that
|
|
doesn't make sense for us.
|
|
"""
|
|
pass
|
|
|
|
def test_cycle(self):
|
|
"""
|
|
This test tested cycle_key() which would create a new session
|
|
key for the same session data. But we can't invalidate previously
|
|
signed cookies (other than letting them expire naturally) so
|
|
testing for this behavior is meaningless.
|
|
"""
|
|
pass
|
|
|
|
@unittest.expectedFailure
|
|
def test_actual_expiry(self):
|
|
# The cookie backend doesn't handle non-default expiry dates, see #19201
|
|
super(CookieSessionTests, self).test_actual_expiry()
|