[py3] Used BytesIO to test request streams
This commit is contained in:
parent
87e0a75c03
commit
97fe70d30b
|
@ -3,6 +3,7 @@ from __future__ import unicode_literals
|
||||||
import time
|
import time
|
||||||
import warnings
|
import warnings
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.core.handlers.wsgi import WSGIRequest, LimitedStream
|
from django.core.handlers.wsgi import WSGIRequest, LimitedStream
|
||||||
|
@ -10,17 +11,16 @@ from django.http import HttpRequest, HttpResponse, parse_cookie, build_request_r
|
||||||
from django.test.utils import str_prefix
|
from django.test.utils import str_prefix
|
||||||
from django.utils import unittest
|
from django.utils import unittest
|
||||||
from django.utils.http import cookie_date
|
from django.utils.http import cookie_date
|
||||||
from django.utils.six import StringIO
|
|
||||||
from django.utils.timezone import utc
|
from django.utils.timezone import utc
|
||||||
|
|
||||||
|
|
||||||
class RequestsTests(unittest.TestCase):
|
class RequestsTests(unittest.TestCase):
|
||||||
def test_httprequest(self):
|
def test_httprequest(self):
|
||||||
request = HttpRequest()
|
request = HttpRequest()
|
||||||
self.assertEqual(request.GET.keys(), [])
|
self.assertEqual(list(request.GET.keys()), [])
|
||||||
self.assertEqual(request.POST.keys(), [])
|
self.assertEqual(list(request.POST.keys()), [])
|
||||||
self.assertEqual(request.COOKIES.keys(), [])
|
self.assertEqual(list(request.COOKIES.keys()), [])
|
||||||
self.assertEqual(request.META.keys(), [])
|
self.assertEqual(list(request.META.keys()), [])
|
||||||
|
|
||||||
def test_httprequest_repr(self):
|
def test_httprequest_repr(self):
|
||||||
request = HttpRequest()
|
request = HttpRequest()
|
||||||
|
@ -35,17 +35,17 @@ class RequestsTests(unittest.TestCase):
|
||||||
str_prefix("<HttpRequest\npath:/otherpath/,\nGET:{%(_)s'a': %(_)s'b'},\nPOST:{%(_)s'c': %(_)s'd'},\nCOOKIES:{%(_)s'e': %(_)s'f'},\nMETA:{%(_)s'g': %(_)s'h'}>"))
|
str_prefix("<HttpRequest\npath:/otherpath/,\nGET:{%(_)s'a': %(_)s'b'},\nPOST:{%(_)s'c': %(_)s'd'},\nCOOKIES:{%(_)s'e': %(_)s'f'},\nMETA:{%(_)s'g': %(_)s'h'}>"))
|
||||||
|
|
||||||
def test_wsgirequest(self):
|
def test_wsgirequest(self):
|
||||||
request = WSGIRequest({'PATH_INFO': 'bogus', 'REQUEST_METHOD': 'bogus', 'wsgi.input': StringIO('')})
|
request = WSGIRequest({'PATH_INFO': 'bogus', 'REQUEST_METHOD': 'bogus', 'wsgi.input': BytesIO(b'')})
|
||||||
self.assertEqual(request.GET.keys(), [])
|
self.assertEqual(list(request.GET.keys()), [])
|
||||||
self.assertEqual(request.POST.keys(), [])
|
self.assertEqual(list(request.POST.keys()), [])
|
||||||
self.assertEqual(request.COOKIES.keys(), [])
|
self.assertEqual(list(request.COOKIES.keys()), [])
|
||||||
self.assertEqual(set(request.META.keys()), set(['PATH_INFO', 'REQUEST_METHOD', 'SCRIPT_NAME', 'wsgi.input']))
|
self.assertEqual(set(request.META.keys()), set(['PATH_INFO', 'REQUEST_METHOD', 'SCRIPT_NAME', 'wsgi.input']))
|
||||||
self.assertEqual(request.META['PATH_INFO'], 'bogus')
|
self.assertEqual(request.META['PATH_INFO'], 'bogus')
|
||||||
self.assertEqual(request.META['REQUEST_METHOD'], 'bogus')
|
self.assertEqual(request.META['REQUEST_METHOD'], 'bogus')
|
||||||
self.assertEqual(request.META['SCRIPT_NAME'], '')
|
self.assertEqual(request.META['SCRIPT_NAME'], '')
|
||||||
|
|
||||||
def test_wsgirequest_repr(self):
|
def test_wsgirequest_repr(self):
|
||||||
request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': StringIO('')})
|
request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
|
||||||
request.GET = {'get-key': 'get-value'}
|
request.GET = {'get-key': 'get-value'}
|
||||||
request.POST = {'post-key': 'post-value'}
|
request.POST = {'post-key': 'post-value'}
|
||||||
request.COOKIES = {'post-key': 'post-value'}
|
request.COOKIES = {'post-key': 'post-value'}
|
||||||
|
@ -207,26 +207,26 @@ class RequestsTests(unittest.TestCase):
|
||||||
|
|
||||||
def test_limited_stream(self):
|
def test_limited_stream(self):
|
||||||
# Read all of a limited stream
|
# Read all of a limited stream
|
||||||
stream = LimitedStream(StringIO(b'test'), 2)
|
stream = LimitedStream(BytesIO(b'test'), 2)
|
||||||
self.assertEqual(stream.read(), b'te')
|
self.assertEqual(stream.read(), b'te')
|
||||||
# Reading again returns nothing.
|
# Reading again returns nothing.
|
||||||
self.assertEqual(stream.read(), b'')
|
self.assertEqual(stream.read(), b'')
|
||||||
|
|
||||||
# Read a number of characters greater than the stream has to offer
|
# Read a number of characters greater than the stream has to offer
|
||||||
stream = LimitedStream(StringIO(b'test'), 2)
|
stream = LimitedStream(BytesIO(b'test'), 2)
|
||||||
self.assertEqual(stream.read(5), b'te')
|
self.assertEqual(stream.read(5), b'te')
|
||||||
# Reading again returns nothing.
|
# Reading again returns nothing.
|
||||||
self.assertEqual(stream.readline(5), b'')
|
self.assertEqual(stream.readline(5), b'')
|
||||||
|
|
||||||
# Read sequentially from a stream
|
# Read sequentially from a stream
|
||||||
stream = LimitedStream(StringIO(b'12345678'), 8)
|
stream = LimitedStream(BytesIO(b'12345678'), 8)
|
||||||
self.assertEqual(stream.read(5), b'12345')
|
self.assertEqual(stream.read(5), b'12345')
|
||||||
self.assertEqual(stream.read(5), b'678')
|
self.assertEqual(stream.read(5), b'678')
|
||||||
# Reading again returns nothing.
|
# Reading again returns nothing.
|
||||||
self.assertEqual(stream.readline(5), b'')
|
self.assertEqual(stream.readline(5), b'')
|
||||||
|
|
||||||
# Read lines from a stream
|
# Read lines from a stream
|
||||||
stream = LimitedStream(StringIO(b'1234\n5678\nabcd\nefgh\nijkl'), 24)
|
stream = LimitedStream(BytesIO(b'1234\n5678\nabcd\nefgh\nijkl'), 24)
|
||||||
# Read a full line, unconditionally
|
# Read a full line, unconditionally
|
||||||
self.assertEqual(stream.readline(), b'1234\n')
|
self.assertEqual(stream.readline(), b'1234\n')
|
||||||
# Read a number of characters less than a line
|
# Read a number of characters less than a line
|
||||||
|
@ -246,7 +246,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
# If a stream contains a newline, but the provided length
|
# If a stream contains a newline, but the provided length
|
||||||
# is less than the number of provided characters, the newline
|
# is less than the number of provided characters, the newline
|
||||||
# doesn't reset the available character count
|
# doesn't reset the available character count
|
||||||
stream = LimitedStream(StringIO(b'1234\nabcdef'), 9)
|
stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9)
|
||||||
self.assertEqual(stream.readline(10), b'1234\n')
|
self.assertEqual(stream.readline(10), b'1234\n')
|
||||||
self.assertEqual(stream.readline(3), b'abc')
|
self.assertEqual(stream.readline(3), b'abc')
|
||||||
# Now expire the available characters
|
# Now expire the available characters
|
||||||
|
@ -255,7 +255,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
self.assertEqual(stream.readline(2), b'')
|
self.assertEqual(stream.readline(2), b'')
|
||||||
|
|
||||||
# Same test, but with read, not readline.
|
# Same test, but with read, not readline.
|
||||||
stream = LimitedStream(StringIO(b'1234\nabcdef'), 9)
|
stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9)
|
||||||
self.assertEqual(stream.read(6), b'1234\na')
|
self.assertEqual(stream.read(6), b'1234\na')
|
||||||
self.assertEqual(stream.read(2), b'bc')
|
self.assertEqual(stream.read(2), b'bc')
|
||||||
self.assertEqual(stream.read(2), b'd')
|
self.assertEqual(stream.read(2), b'd')
|
||||||
|
@ -266,7 +266,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
self.assertEqual(request.read(), b'name=value')
|
self.assertEqual(request.read(), b'name=value')
|
||||||
|
|
||||||
def test_read_after_value(self):
|
def test_read_after_value(self):
|
||||||
|
@ -277,7 +277,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
self.assertEqual(request.POST, {'name': ['value']})
|
self.assertEqual(request.POST, {'name': ['value']})
|
||||||
self.assertEqual(request.body, b'name=value')
|
self.assertEqual(request.body, b'name=value')
|
||||||
self.assertEqual(request.read(), b'name=value')
|
self.assertEqual(request.read(), b'name=value')
|
||||||
|
@ -290,7 +290,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
self.assertEqual(request.read(2), b'na')
|
self.assertEqual(request.read(2), b'na')
|
||||||
self.assertRaises(Exception, lambda: request.body)
|
self.assertRaises(Exception, lambda: request.body)
|
||||||
self.assertEqual(request.POST, {})
|
self.assertEqual(request.POST, {})
|
||||||
|
@ -312,7 +312,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
|
'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
self.assertEqual(request.POST, {'name': ['value']})
|
self.assertEqual(request.POST, {'name': ['value']})
|
||||||
self.assertRaises(Exception, lambda: request.body)
|
self.assertRaises(Exception, lambda: request.body)
|
||||||
|
|
||||||
|
@ -334,14 +334,14 @@ class RequestsTests(unittest.TestCase):
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
|
'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
|
||||||
'CONTENT_LENGTH': 0,
|
'CONTENT_LENGTH': 0,
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
self.assertEqual(request.POST, {})
|
self.assertEqual(request.POST, {})
|
||||||
|
|
||||||
def test_read_by_lines(self):
|
def test_read_by_lines(self):
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
self.assertEqual(list(request), [b'name=value'])
|
self.assertEqual(list(request), [b'name=value'])
|
||||||
|
|
||||||
def test_POST_after_body_read(self):
|
def test_POST_after_body_read(self):
|
||||||
|
@ -351,7 +351,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
raw_data = request.body
|
raw_data = request.body
|
||||||
self.assertEqual(request.POST, {'name': ['value']})
|
self.assertEqual(request.POST, {'name': ['value']})
|
||||||
|
|
||||||
|
@ -363,7 +363,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
raw_data = request.body
|
raw_data = request.body
|
||||||
self.assertEqual(request.read(1), b'n')
|
self.assertEqual(request.read(1), b'n')
|
||||||
self.assertEqual(request.POST, {'name': ['value']})
|
self.assertEqual(request.POST, {'name': ['value']})
|
||||||
|
@ -383,7 +383,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
|
'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)})
|
'wsgi.input': BytesIO(payload)})
|
||||||
raw_data = request.body
|
raw_data = request.body
|
||||||
# Consume enough data to mess up the parsing:
|
# Consume enough data to mess up the parsing:
|
||||||
self.assertEqual(request.read(13), b'--boundary\r\nC')
|
self.assertEqual(request.read(13), b'--boundary\r\nC')
|
||||||
|
@ -397,7 +397,7 @@ class RequestsTests(unittest.TestCase):
|
||||||
request = WSGIRequest({
|
request = WSGIRequest({
|
||||||
'REQUEST_METHOD': 'POST',
|
'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': StringIO(payload)
|
'wsgi.input': BytesIO(payload)
|
||||||
})
|
})
|
||||||
|
|
||||||
with warnings.catch_warnings(record=True):
|
with warnings.catch_warnings(record=True):
|
||||||
|
@ -408,14 +408,14 @@ class RequestsTests(unittest.TestCase):
|
||||||
If wsgi.input.read() raises an exception while trying to read() the
|
If wsgi.input.read() raises an exception while trying to read() the
|
||||||
POST, the exception should be identifiable (not a generic IOError).
|
POST, the exception should be identifiable (not a generic IOError).
|
||||||
"""
|
"""
|
||||||
class ExplodingStringIO(StringIO):
|
class ExplodingBytesIO(BytesIO):
|
||||||
def read(self, len=0):
|
def read(self, len=0):
|
||||||
raise IOError("kaboom!")
|
raise IOError("kaboom!")
|
||||||
|
|
||||||
payload = b'name=value'
|
payload = b'name=value'
|
||||||
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
request = WSGIRequest({'REQUEST_METHOD': 'POST',
|
||||||
'CONTENT_LENGTH': len(payload),
|
'CONTENT_LENGTH': len(payload),
|
||||||
'wsgi.input': ExplodingStringIO(payload)})
|
'wsgi.input': ExplodingBytesIO(payload)})
|
||||||
|
|
||||||
with warnings.catch_warnings(record=True) as w:
|
with warnings.catch_warnings(record=True) as w:
|
||||||
warnings.simplefilter("always")
|
warnings.simplefilter("always")
|
||||||
|
|
Loading…
Reference in New Issue