Refs #23766 -- Added tests for CursorWrapper.callproc().

Thanks Tim Graham for the review.
This commit is contained in:
Mariusz Felisiak 2017-08-08 22:13:02 +02:00 committed by GitHub
parent c754bdc45b
commit 3189a93ceb
7 changed files with 86 additions and 1 deletions

View File

@ -235,6 +235,11 @@ class BaseDatabaseFeatures:
# Does the backend support CAST with precision?
supports_cast_with_precision = True
# SQL to create a procedure for use by the Django test suite. The
# functionality of the procedure isn't important.
create_test_procedure_without_params_sql = None
create_test_procedure_with_int_param_sql = None
def __init__(self, connection):
self.connection = connection

View File

@ -66,6 +66,8 @@ class BaseDatabaseSchemaEditor:
sql_create_pk = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
sql_delete_pk = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
sql_delete_procedure = 'DROP PROCEDURE %(procedure)s'
def __init__(self, connection, collect_sql=False, atomic=True):
self.connection = connection
self.collect_sql = collect_sql
@ -1027,3 +1029,10 @@ class BaseDatabaseSchemaEditor:
))
for constraint_name in constraint_names:
self.execute(self._delete_constraint_sql(self.sql_delete_pk, model, constraint_name))
def remove_procedure(self, procedure_name, param_types=()):
sql = self.sql_delete_procedure % {
'procedure': self.quote_name(procedure_name),
'param_types': ','.join(param_types),
}
self.execute(sql)

View File

@ -33,6 +33,20 @@ class DatabaseFeatures(BaseDatabaseFeatures):
supports_slicing_ordering_in_compound = True
supports_index_on_text_field = False
has_case_insensitive_like = False
create_test_procedure_without_params_sql = """
CREATE PROCEDURE test_procedure ()
BEGIN
DECLARE V_I INTEGER;
SET V_I = 1;
END;
"""
create_test_procedure_with_int_param_sql = """
CREATE PROCEDURE test_procedure (P_I INTEGER)
BEGIN
DECLARE V_I INTEGER;
SET V_I = P_I;
END;
"""
@cached_property
def _mysql_storage_engine(self):

View File

@ -40,3 +40,17 @@ class DatabaseFeatures(BaseDatabaseFeatures):
ignores_table_name_case = True
supports_index_on_text_field = False
has_case_insensitive_like = False
create_test_procedure_without_params_sql = """
CREATE PROCEDURE "TEST_PROCEDURE" AS
V_I INTEGER;
BEGIN
V_I := 1;
END;
"""
create_test_procedure_with_int_param_sql = """
CREATE PROCEDURE "TEST_PROCEDURE" (P_I INTEGER) AS
V_I INTEGER;
BEGIN
V_I := P_I;
END;
"""

View File

@ -33,6 +33,22 @@ class DatabaseFeatures(BaseDatabaseFeatures):
can_clone_databases = True
supports_temporal_subtraction = True
supports_slicing_ordering_in_compound = True
create_test_procedure_without_params_sql = """
CREATE FUNCTION test_procedure () RETURNS void AS $$
DECLARE
V_I INTEGER;
BEGIN
V_I := 1;
END;
$$ LANGUAGE plpgsql;"""
create_test_procedure_with_int_param_sql = """
CREATE FUNCTION test_procedure (P_I INTEGER) RETURNS void AS $$
DECLARE
V_I INTEGER;
BEGIN
V_I := P_I;
END;
$$ LANGUAGE plpgsql;"""
@cached_property
def has_select_for_update_skip_locked(self):

View File

@ -20,6 +20,8 @@ class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
# dropping it in the same transaction.
sql_delete_fk = "SET CONSTRAINTS %(name)s IMMEDIATE; ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
sql_delete_procedure = 'DROP FUNCTION %(procedure)s(%(param_types)s)'
def quote_value(self, value):
return psycopg2.extensions.adapt(value)

View File

@ -1,8 +1,11 @@
"""Tests for django.db.backends.utils"""
from decimal import Decimal, Rounded
from django.db import connection
from django.db.backends.utils import format_number, truncate_name
from django.test import SimpleTestCase
from django.test import (
SimpleTestCase, TransactionTestCase, skipUnlessDBFeature,
)
class TestUtils(SimpleTestCase):
@ -45,3 +48,25 @@ class TestUtils(SimpleTestCase):
equal('0.1234567890', 5, None, '0.12346')
with self.assertRaises(Rounded):
equal('1234567890.1234', 5, None, '1234600000')
class CursorWrapperTests(TransactionTestCase):
available_apps = []
def _test_procedure(self, procedure_sql, params, param_types):
with connection.cursor() as cursor:
cursor.execute(procedure_sql)
# Use a new cursor because in MySQL a procedure can't be used in the
# same cursor in which it was created.
with connection.cursor() as cursor:
cursor.callproc('test_procedure', params)
with connection.schema_editor() as editor:
editor.remove_procedure('test_procedure', param_types)
@skipUnlessDBFeature('create_test_procedure_without_params_sql')
def test_callproc_without_params(self):
self._test_procedure(connection.features.create_test_procedure_without_params_sql, [], [])
@skipUnlessDBFeature('create_test_procedure_with_int_param_sql')
def test_callproc_with_int_params(self):
self._test_procedure(connection.features.create_test_procedure_with_int_param_sql, [1], ['INTEGER'])