Kaydet (Commit) 05cea7fd authored tarafından Aymeric Augustin's avatar Aymeric Augustin

Changed database connection duplication technique.

This new technique is more straightforward and compatible with test
parallelization, where the effective database connection settings no
longer match settings.DATABASES.
üst e8b49d4c
import copy
import time
import warnings
from collections import deque
......@@ -622,3 +623,16 @@ class BaseDatabaseWrapper(object):
func()
finally:
self.run_on_commit = []
def copy(self, alias=None, allow_thread_sharing=None):
"""
Return a copy of this connection.
For tests that require two connections to the same database.
"""
settings_dict = copy.deepcopy(self.settings_dict)
if alias is None:
alias = self.alias
if allow_thread_sharing is None:
allow_thread_sharing = self.allow_thread_sharing
return type(self)(settings_dict, alias, allow_thread_sharing)
......@@ -2,7 +2,6 @@
# Unit and doctests for specific database backends.
from __future__ import unicode_literals
import copy
import datetime
import re
import threading
......@@ -10,7 +9,6 @@ import unittest
import warnings
from decimal import Decimal, Rounded
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.core.management.color import no_style
from django.db import (
......@@ -182,8 +180,8 @@ class PostgreSQLTests(TestCase):
nodb_conn = connection._nodb_connection
del connection._nodb_connection
self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
self.assertEqual(nodb_conn.settings_dict['NAME'], settings.DATABASES[DEFAULT_DB_ALIAS]['NAME'])
# Check a RuntimeWarning nas been emitted
self.assertEqual(nodb_conn.settings_dict['NAME'], connection.settings_dict['NAME'])
# Check a RuntimeWarning has been emitted
self.assertEqual(len(w), 1)
self.assertEqual(w[0].message.__class__, RuntimeWarning)
......@@ -219,9 +217,7 @@ class PostgreSQLTests(TestCase):
PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
transaction is rolled back (#17062).
"""
databases = copy.deepcopy(settings.DATABASES)
new_connections = ConnectionHandler(databases)
new_connection = new_connections[DEFAULT_DB_ALIAS]
new_connection = connection.copy()
try:
# Ensure the database default time zone is different than
......@@ -258,10 +254,9 @@ class PostgreSQLTests(TestCase):
The connection wrapper shouldn't believe that autocommit is enabled
after setting the time zone when AUTOCOMMIT is False (#21452).
"""
databases = copy.deepcopy(settings.DATABASES)
databases[DEFAULT_DB_ALIAS]['AUTOCOMMIT'] = False
new_connections = ConnectionHandler(databases)
new_connection = new_connections[DEFAULT_DB_ALIAS]
new_connection = connection.copy()
new_connection.settings_dict['AUTOCOMMIT'] = False
try:
# Open a database connection.
new_connection.cursor()
......@@ -285,10 +280,8 @@ class PostgreSQLTests(TestCase):
# Check the level on the psycopg2 connection, not the Django wrapper.
self.assertEqual(connection.connection.isolation_level, read_committed)
databases = copy.deepcopy(settings.DATABASES)
databases[DEFAULT_DB_ALIAS]['OPTIONS']['isolation_level'] = serializable
new_connections = ConnectionHandler(databases)
new_connection = new_connections[DEFAULT_DB_ALIAS]
new_connection = connection.copy()
new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
try:
# Start a transaction so the isolation level isn't reported as 0.
new_connection.set_autocommit(False)
......@@ -748,8 +741,7 @@ class BackendTestCase(TransactionTestCase):
"""
old_queries_limit = BaseDatabaseWrapper.queries_limit
BaseDatabaseWrapper.queries_limit = 3
new_connections = ConnectionHandler(settings.DATABASES)
new_connection = new_connections[DEFAULT_DB_ALIAS]
new_connection = connection.copy()
# Initialize the connection and clear initialization statements.
with new_connection.cursor():
......
......@@ -2,9 +2,7 @@ from __future__ import unicode_literals
import datetime
from django.conf import settings
from django.db import DEFAULT_DB_ALIAS, models, transaction
from django.db.utils import ConnectionHandler
from django.db import connection, models, transaction
from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
from .models import (
......@@ -24,8 +22,7 @@ class DeleteLockingTest(TransactionTestCase):
def setUp(self):
# Create a second connection to the default database
new_connections = ConnectionHandler(settings.DATABASES)
self.conn2 = new_connections[DEFAULT_DB_ALIAS]
self.conn2 = connection.copy()
self.conn2.set_autocommit(False)
def tearDown(self):
......
......@@ -5,9 +5,7 @@ import time
from multiple_database.routers import TestRouter
from django.conf import settings
from django.db import connection, router, transaction
from django.db.utils import DEFAULT_DB_ALIAS, ConnectionHandler, DatabaseError
from django.db import DatabaseError, connection, router, transaction
from django.test import (
TransactionTestCase, override_settings, skipIfDBFeature,
skipUnlessDBFeature,
......@@ -30,8 +28,7 @@ class SelectForUpdateTests(TransactionTestCase):
# We need another database connection in transaction to test that one
# connection issuing a SELECT ... FOR UPDATE will block.
new_connections = ConnectionHandler(settings.DATABASES)
self.new_connection = new_connections[DEFAULT_DB_ALIAS]
self.new_connection = connection.copy()
def tearDown(self):
try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment