tests.py 33.1 KB
Newer Older
1
"""Tests related to django.db.backends that haven't been organized."""
2
import datetime
3
import threading
4
import unittest
5
import warnings
6 7

from django.core.management.color import no_style
8 9 10 11
from django.db import (
    DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connection, connections,
    reset_queries, transaction,
)
12
from django.db.backends.base.base import BaseDatabaseWrapper
13
from django.db.backends.signals import connection_created
14
from django.db.backends.utils import CursorWrapper
15
from django.db.models.sql.constants import CURSOR
16
from django.test import (
17 18
    TestCase, TransactionTestCase, override_settings, skipIfDBFeature,
    skipUnlessDBFeature,
19
)
20

21
from .models import (
22
    Article, Object, ObjectReference, Person, Post, RawData, Reporter,
23 24 25
    ReporterProxy, SchoolClass, Square,
    VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ,
)
26

27

28 29 30 31 32
class DateQuotingTest(TestCase):

    def test_django_date_trunc(self):
        """
        Test the custom ``django_date_trunc method``, in particular against
33
        fields which clash with strings passed to it (e.g. 'year') (#12818).
34 35
        """
        updated = datetime.datetime(2010, 2, 20)
36 37
        SchoolClass.objects.create(year=2009, last_updated=updated)
        years = SchoolClass.objects.dates('last_updated', 'year')
38
        self.assertEqual(list(years), [datetime.date(2010, 1, 1)])
39

40
    def test_django_date_extract(self):
41
        """
42
        Test the custom ``django_date_extract method``, in particular against fields
43
        which clash with strings passed to it (e.g. 'day') (#12818).
44 45
        """
        updated = datetime.datetime(2010, 2, 20)
46 47
        SchoolClass.objects.create(year=2009, last_updated=updated)
        classes = SchoolClass.objects.filter(last_updated__day=20)
48 49
        self.assertEqual(len(classes), 1)

50

51
@override_settings(DEBUG=True)
52
class LastExecutedQueryTest(TestCase):
53

54
    def test_last_executed_query_without_previous_query(self):
55 56 57 58
        """
        last_executed_query should not raise an exception even if no previous
        query has been run.
        """
59 60
        with connection.cursor() as cursor:
            connection.ops.last_executed_query(cursor, '', ())
61

62
    def test_debug_sql(self):
63
        list(Reporter.objects.filter(first_name="test"))
64
        sql = connection.queries[-1]['sql'].lower()
65
        self.assertIn("select", sql)
66
        self.assertIn(Reporter._meta.db_table, sql)
67

68
    def test_query_encoding(self):
69
        """last_executed_query() returns a string."""
70
        data = RawData.objects.filter(raw_data=b'\x00\x46  \xFE').extra(select={'föö': 1})
71 72
        sql, params = data.query.sql_with_params()
        cursor = data.query.get_compiler('default').execute_sql(CURSOR)
73
        last_sql = cursor.db.ops.last_executed_query(cursor, sql, params)
74
        self.assertIsInstance(last_sql, str)
75

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    def test_last_executed_query(self):
        # last_executed_query() interpolate all parameters, in most cases it is
        # not equal to QuerySet.query.
        for qs in (
            Article.objects.filter(pk=1),
            Article.objects.filter(pk__in=(1, 2), reporter__pk=3),
        ):
            sql, params = qs.query.sql_with_params()
            cursor = qs.query.get_compiler(DEFAULT_DB_ALIAS).execute_sql(CURSOR)
            self.assertEqual(
                cursor.db.ops.last_executed_query(cursor, sql, params),
                str(qs.query),
            )

    @skipUnlessDBFeature('supports_paramstyle_pyformat')
    def test_last_executed_query_dict(self):
        square_opts = Square._meta
        sql = 'INSERT INTO %s (%s, %s) VALUES (%%(root)s, %%(square)s)' % (
            connection.introspection.identifier_converter(square_opts.db_table),
            connection.ops.quote_name(square_opts.get_field('root').column),
            connection.ops.quote_name(square_opts.get_field('square').column),
        )
        with connection.cursor() as cursor:
            params = {'root': 2, 'square': 4}
            cursor.execute(sql, params)
            self.assertEqual(
                cursor.db.ops.last_executed_query(cursor, sql, params),
                sql % params,
            )

106

107
class ParameterHandlingTest(TestCase):
108

109 110
    def test_bad_parameter_count(self):
        "An executemany call with too many/not enough parameters will raise an exception (Refs #12612)"
111 112
        with connection.cursor() as cursor:
            query = ('INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (
113
                connection.introspection.identifier_converter('backends_square'),
114 115 116 117 118 119 120
                connection.ops.quote_name('root'),
                connection.ops.quote_name('square')
            ))
            with self.assertRaises(Exception):
                cursor.executemany(query, [(1, 2, 3)])
            with self.assertRaises(Exception):
                cursor.executemany(query, [(1,)])
121

122

123
class LongNameTest(TransactionTestCase):
124 125 126 127 128 129
    """Long primary keys and model names can result in a sequence name
    that exceeds the database limits, which will result in truncation
    on certain databases (e.g., Postgres). The backend needs to use
    the correct sequence name in last_insert_id and other places, so
    check it is. Refs #8901.
    """
130
    available_apps = ['backends']
131 132 133

    def test_sequence_name_length_limits_create(self):
        """Test creation of model with long name and long pk name doesn't error. Ref #8901"""
134
        VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ.objects.create()
135 136

    def test_sequence_name_length_limits_m2m(self):
137 138 139 140
        """
        An m2m save of a model with a long name and a long m2m field name
        doesn't error (#8901).
        """
141 142
        obj = VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ.objects.create()
        rel_obj = Person.objects.create(first_name='Django', last_name='Reinhardt')
143 144 145
        obj.m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz.add(rel_obj)

    def test_sequence_name_length_limits_flush(self):
146 147 148 149
        """
        Sequence resetting as part of a flush with model with long name and
        long pk name doesn't error (#8901).
        """
150 151 152 153
        # A full flush is expensive to the full test, so we dig into the
        # internals to generate the likely offending SQL and run it manually

        # Some convenience aliases
154
        VLM = VeryLongModelNameZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZZ
155 156 157 158 159 160 161 162 163 164 165
        VLM_m2m = VLM.m2m_also_quite_long_zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz.through
        tables = [
            VLM._meta.db_table,
            VLM_m2m._meta.db_table,
        ]
        sequences = [
            {
                'column': VLM._meta.pk.column,
                'table': VLM._meta.db_table
            },
        ]
166 167 168 169
        sql_list = connection.ops.sql_flush(no_style(), tables, sequences)
        with connection.cursor() as cursor:
            for statement in sql_list:
                cursor.execute(statement)
170

171

172
class SequenceResetTest(TestCase):
173

174 175 176
    def test_generic_relation(self):
        "Sequence names are correct when resetting generic relations (Ref #13941)"
        # Create an object with a manually specified PK
177
        Post.objects.create(id=10, name='1st post', text='hello world')
178 179

        # Reset the sequences for the database
180
        commands = connections[DEFAULT_DB_ALIAS].ops.sequence_reset_sql(no_style(), [Post])
181 182 183
        with connection.cursor() as cursor:
            for sql in commands:
                cursor.execute(sql)
184 185 186

        # If we create a new object now, it should have a PK greater
        # than the PK we specified manually.
187
        obj = Post.objects.create(name='New post', text='goodbye world')
188
        self.assertGreater(obj.pk, 10)
189

190

191 192 193 194
# This test needs to run outside of a transaction, otherwise closing the
# connection would implicitly rollback and cause problems during teardown.
class ConnectionCreatedSignalTest(TransactionTestCase):

195 196
    available_apps = []

197 198
    # Unfortunately with sqlite3 the in-memory test database cannot be closed,
    # and so it cannot be re-opened during testing.
199 200 201
    @skipUnlessDBFeature('test_db_allows_multiple_connections')
    def test_signal(self):
        data = {}
202

203 204 205 206 207
        def receiver(sender, connection, **kwargs):
            data["connection"] = connection

        connection_created.connect(receiver)
        connection.close()
208
        connection.cursor()
209
        self.assertIs(data["connection"].connection, connection.connection)
210

211 212
        connection_created.disconnect(receiver)
        data.clear()
213
        connection.cursor()
214
        self.assertEqual(data, {})
215 216


217
class EscapingChecks(TestCase):
218 219 220 221
    """
    All tests in this test case are also run with settings.DEBUG=True in
    EscapingChecksDebug test case, to also test CursorDebugWrapper.
    """
222

223
    bare_select_suffix = connection.features.bare_select_suffix
224

225
    def test_paramless_no_escaping(self):
226 227 228
        with connection.cursor() as cursor:
            cursor.execute("SELECT '%s'" + self.bare_select_suffix)
            self.assertEqual(cursor.fetchall()[0][0], '%s')
229 230

    def test_parameter_escaping(self):
231 232 233
        with connection.cursor() as cursor:
            cursor.execute("SELECT '%%', %s" + self.bare_select_suffix, ('%d',))
            self.assertEqual(cursor.fetchall()[0], ('%', '%d'))
234

Jason Myers's avatar
Jason Myers committed
235

236 237 238 239
@override_settings(DEBUG=True)
class EscapingChecksDebug(EscapingChecks):
    pass

240

241 242 243
class BackendTestCase(TransactionTestCase):

    available_apps = ['backends']
244 245

    def create_squares_with_executemany(self, args):
246 247
        self.create_squares(args, 'format', True)

248
    def create_squares(self, args, paramstyle, multiple):
249
        opts = Square._meta
250
        tbl = connection.introspection.identifier_converter(opts.db_table)
251 252
        f1 = connection.ops.quote_name(opts.get_field('root').column)
        f2 = connection.ops.quote_name(opts.get_field('square').column)
Tim Graham's avatar
Tim Graham committed
253
        if paramstyle == 'format':
254
            query = 'INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (tbl, f1, f2)
Tim Graham's avatar
Tim Graham committed
255
        elif paramstyle == 'pyformat':
256 257 258
            query = 'INSERT INTO %s (%s, %s) VALUES (%%(root)s, %%(square)s)' % (tbl, f1, f2)
        else:
            raise ValueError("unsupported paramstyle in test")
259 260 261 262 263
        with connection.cursor() as cursor:
            if multiple:
                cursor.executemany(query, args)
            else:
                cursor.execute(query, args)
264 265

    def test_cursor_executemany(self):
266
        # Test cursor.executemany #4896
Jason Myers's avatar
Jason Myers committed
267
        args = [(i, i ** 2) for i in range(-5, 6)]
268
        self.create_squares_with_executemany(args)
269
        self.assertEqual(Square.objects.count(), 11)
270
        for i in range(-5, 6):
271
            square = Square.objects.get(root=i)
Jason Myers's avatar
Jason Myers committed
272
            self.assertEqual(square.square, i ** 2)
273

274
    def test_cursor_executemany_with_empty_params_list(self):
275
        # Test executemany with params=[] does nothing #4765
276 277
        args = []
        self.create_squares_with_executemany(args)
278
        self.assertEqual(Square.objects.count(), 0)
279 280

    def test_cursor_executemany_with_iterator(self):
281
        # Test executemany accepts iterators #10320
282
        args = ((i, i ** 2) for i in range(-3, 2))
283
        self.create_squares_with_executemany(args)
284
        self.assertEqual(Square.objects.count(), 5)
285

286
        args = ((i, i ** 2) for i in range(3, 7))
287 288 289
        with override_settings(DEBUG=True):
            # same test for DebugCursorWrapper
            self.create_squares_with_executemany(args)
290
        self.assertEqual(Square.objects.count(), 9)
291

292 293
    @skipUnlessDBFeature('supports_paramstyle_pyformat')
    def test_cursor_execute_with_pyformat(self):
294
        # Support pyformat style passing of parameters #10070
295 296
        args = {'root': 3, 'square': 9}
        self.create_squares(args, 'pyformat', multiple=False)
297
        self.assertEqual(Square.objects.count(), 1)
298 299 300

    @skipUnlessDBFeature('supports_paramstyle_pyformat')
    def test_cursor_executemany_with_pyformat(self):
301
        # Support pyformat style passing of parameters #10070
Jason Myers's avatar
Jason Myers committed
302
        args = [{'root': i, 'square': i ** 2} for i in range(-5, 6)]
303
        self.create_squares(args, 'pyformat', multiple=True)
304
        self.assertEqual(Square.objects.count(), 11)
305
        for i in range(-5, 6):
306
            square = Square.objects.get(root=i)
Jason Myers's avatar
Jason Myers committed
307
            self.assertEqual(square.square, i ** 2)
308 309 310

    @skipUnlessDBFeature('supports_paramstyle_pyformat')
    def test_cursor_executemany_with_pyformat_iterator(self):
311
        args = ({'root': i, 'square': i ** 2} for i in range(-3, 2))
312
        self.create_squares(args, 'pyformat', multiple=True)
313
        self.assertEqual(Square.objects.count(), 5)
314

315
        args = ({'root': i, 'square': i ** 2} for i in range(3, 7))
316 317 318
        with override_settings(DEBUG=True):
            # same test for DebugCursorWrapper
            self.create_squares(args, 'pyformat', multiple=True)
319
        self.assertEqual(Square.objects.count(), 9)
320

321
    def test_unicode_fetches(self):
322
        # fetchone, fetchmany, fetchall return strings as unicode objects #6254
323
        qn = connection.ops.quote_name
324 325 326 327 328 329
        Person(first_name="John", last_name="Doe").save()
        Person(first_name="Jane", last_name="Doe").save()
        Person(first_name="Mary", last_name="Agnelline").save()
        Person(first_name="Peter", last_name="Parker").save()
        Person(first_name="Clark", last_name="Kent").save()
        opts2 = Person._meta
330
        f3, f4 = opts2.get_field('first_name'), opts2.get_field('last_name')
331 332 333 334 335
        with connection.cursor() as cursor:
            cursor.execute(
                'SELECT %s, %s FROM %s ORDER BY %s' % (
                    qn(f3.column),
                    qn(f4.column),
336
                    connection.introspection.identifier_converter(opts2.db_table),
337 338
                    qn(f3.column),
                )
339
            )
340 341 342
            self.assertEqual(cursor.fetchone(), ('Clark', 'Kent'))
            self.assertEqual(list(cursor.fetchmany(2)), [('Jane', 'Doe'), ('John', 'Doe')])
            self.assertEqual(list(cursor.fetchall()), [('Mary', 'Agnelline'), ('Peter', 'Parker')])
343

344 345 346 347
    def test_unicode_password(self):
        old_password = connection.settings_dict['PASSWORD']
        connection.settings_dict['PASSWORD'] = "françois"
        try:
348
            connection.cursor()
349
        except DatabaseError:
350 351 352 353 354 355 356
            # As password is probably wrong, a database exception is expected
            pass
        except Exception as e:
            self.fail("Unexpected error raised with unicode password: %s" % e)
        finally:
            connection.settings_dict['PASSWORD'] = old_password

357 358 359 360 361 362
    def test_database_operations_helper_class(self):
        # Ticket #13630
        self.assertTrue(hasattr(connection, 'ops'))
        self.assertTrue(hasattr(connection.ops, 'connection'))
        self.assertEqual(connection, connection.ops.connection)

363 364
    def test_database_operations_init(self):
        """
365
        DatabaseOperations initialization doesn't query the database.
366 367 368 369 370
        See #17656.
        """
        with self.assertNumQueries(0):
            connection.ops.__class__(connection)

371
    def test_cached_db_features(self):
372 373 374
        self.assertIn(connection.features.supports_transactions, (True, False))
        self.assertIn(connection.features.can_introspect_foreign_keys, (True, False))

375
    def test_duplicate_table_error(self):
376
        """ Creating an existing table returns a DatabaseError """
377
        query = 'CREATE TABLE %s (id INTEGER);' % Article._meta.db_table
378 379 380
        with connection.cursor() as cursor:
            with self.assertRaises(DatabaseError):
                cursor.execute(query)
381

382 383
    def test_cursor_contextmanager(self):
        """
384
        Cursors can be used as a context manager
385 386
        """
        with connection.cursor() as cursor:
387
            self.assertIsInstance(cursor, CursorWrapper)
388 389 390 391 392
        # Both InterfaceError and ProgrammingError seem to be used when
        # accessing closed cursor (psycopg2 has InterfaceError, rest seem
        # to use ProgrammingError).
        with self.assertRaises(connection.features.closed_cursor_error_class):
            # cursor should be closed, so no queries should be possible.
393
            cursor.execute("SELECT 1" + connection.features.bare_select_suffix)
394 395 396 397 398 399 400 401

    @unittest.skipUnless(connection.vendor == 'postgresql',
                         "Psycopg2 specific cursor.closed attribute needed")
    def test_cursor_contextmanager_closing(self):
        # There isn't a generic way to test that cursors are closed, but
        # psycopg2 offers us a way to check that by closed attribute.
        # So, run only on psycopg2 for that reason.
        with connection.cursor() as cursor:
402
            self.assertIsInstance(cursor, CursorWrapper)
403 404
        self.assertTrue(cursor.closed)

405 406 407 408
    # Unfortunately with sqlite3 the in-memory test database cannot be closed.
    @skipUnlessDBFeature('test_db_allows_multiple_connections')
    def test_is_usable_after_database_disconnects(self):
        """
409
        is_usable() doesn't crash when the database disconnects (#21553).
410 411 412 413 414 415 416 417 418 419 420 421
        """
        # Open a connection to the database.
        with connection.cursor():
            pass
        # Emulate a connection close by the database.
        connection._close()
        # Even then is_usable() should not raise an exception.
        try:
            self.assertFalse(connection.is_usable())
        finally:
            # Clean up the mess created by connection._close(). Since the
            # connection is already closed, this crashes on some backends.
422
            try:
423
                connection.close()
424 425
            except Exception:
                pass
426

427 428 429 430 431
    @override_settings(DEBUG=True)
    def test_queries(self):
        """
        Test the documented API of connection.queries.
        """
432
        sql = 'SELECT 1' + connection.features.bare_select_suffix
433
        with connection.cursor() as cursor:
434
            reset_queries()
435
            cursor.execute(sql)
436 437 438
        self.assertEqual(1, len(connection.queries))
        self.assertIsInstance(connection.queries, list)
        self.assertIsInstance(connection.queries[0], dict)
439 440
        self.assertEqual(list(connection.queries[0]), ['sql', 'time'])
        self.assertEqual(connection.queries[0]['sql'], sql)
441 442 443 444

        reset_queries()
        self.assertEqual(0, len(connection.queries))

445 446 447 448 449 450 451 452 453 454 455 456 457
        sql = ('INSERT INTO %s (%s, %s) VALUES (%%s, %%s)' % (
            connection.introspection.identifier_converter('backends_square'),
            connection.ops.quote_name('root'),
            connection.ops.quote_name('square'),
        ))
        with connection.cursor() as cursor:
            cursor.executemany(sql, [(1, 1), (2, 4)])
        self.assertEqual(1, len(connection.queries))
        self.assertIsInstance(connection.queries, list)
        self.assertIsInstance(connection.queries[0], dict)
        self.assertEqual(list(connection.queries[0]), ['sql', 'time'])
        self.assertEqual(connection.queries[0]['sql'], '2 times: %s' % sql)

458 459 460 461 462
    # Unfortunately with sqlite3 the in-memory test database cannot be closed.
    @skipUnlessDBFeature('test_db_allows_multiple_connections')
    @override_settings(DEBUG=True)
    def test_queries_limit(self):
        """
463
        The backend doesn't store an unlimited number of queries (#12581).
464 465 466
        """
        old_queries_limit = BaseDatabaseWrapper.queries_limit
        BaseDatabaseWrapper.queries_limit = 3
467
        new_connection = connection.copy()
468

469 470 471 472 473
        # Initialize the connection and clear initialization statements.
        with new_connection.cursor():
            pass
        new_connection.queries_log.clear()

474 475 476 477 478 479 480 481 482 483 484 485 486
        try:
            with new_connection.cursor() as cursor:
                cursor.execute("SELECT 1" + new_connection.features.bare_select_suffix)
                cursor.execute("SELECT 2" + new_connection.features.bare_select_suffix)

            with warnings.catch_warnings(record=True) as w:
                self.assertEqual(2, len(new_connection.queries))
                self.assertEqual(0, len(w))

            with new_connection.cursor() as cursor:
                cursor.execute("SELECT 3" + new_connection.features.bare_select_suffix)
                cursor.execute("SELECT 4" + new_connection.features.bare_select_suffix)

487 488
            msg = "Limit for query logging exceeded, only the last 3 queries will be returned."
            with self.assertWarnsMessage(UserWarning, msg):
489
                self.assertEqual(3, len(new_connection.queries))
490

491 492 493 494
        finally:
            BaseDatabaseWrapper.queries_limit = old_queries_limit
            new_connection.close()

495 496 497 498 499
    def test_timezone_none_use_tz_false(self):
        connection.ensure_connection()
        with self.settings(TIME_ZONE=None, USE_TZ=False):
            connection.init_connection_state()

500

501 502
# These tests aren't conditional because it would require differentiating
# between MySQL+InnoDB and MySQL+MYISAM (something we currently can't do).
503 504
class FkConstraintsTests(TransactionTestCase):

505 506
    available_apps = ['backends']

507 508
    def setUp(self):
        # Create a Reporter.
509
        self.r = Reporter.objects.create(first_name='John', last_name='Smith')
510 511 512 513 514 515

    def test_integrity_checks_on_creation(self):
        """
        Try to create a model instance that violates a FK constraint. If it
        fails it should fail with IntegrityError.
        """
516
        a1 = Article(headline="This is a test", pub_date=datetime.datetime(2005, 7, 27), reporter_id=30)
517
        try:
518
            a1.save()
519
        except IntegrityError:
520 521 522 523
            pass
        else:
            self.skipTest("This backend does not support integrity checks.")
        # Now that we know this backend supports integrity checks we make sure
524 525 526 527 528 529
        # constraints are also enforced for proxy  Refs #17519
        a2 = Article(
            headline='This is another test', reporter=self.r,
            pub_date=datetime.datetime(2012, 8, 3),
            reporter_proxy_id=30,
        )
530 531
        with self.assertRaises(IntegrityError):
            a2.save()
532 533 534 535 536 537 538

    def test_integrity_checks_on_update(self):
        """
        Try to update a model instance introducing a FK constraint violation.
        If it fails it should fail with IntegrityError.
        """
        # Create an Article.
539
        Article.objects.create(headline="Test article", pub_date=datetime.datetime(2010, 9, 4), reporter=self.r)
540
        # Retrieve it from the DB
541
        a1 = Article.objects.get(headline="Test article")
542
        a1.reporter_id = 30
543
        try:
544
            a1.save()
545
        except IntegrityError:
546 547 548 549
            pass
        else:
            self.skipTest("This backend does not support integrity checks.")
        # Now that we know this backend supports integrity checks we make sure
550
        # constraints are also enforced for proxy  Refs #17519
551
        # Create another article
552 553 554 555 556 557
        r_proxy = ReporterProxy.objects.get(pk=self.r.pk)
        Article.objects.create(
            headline='Another article',
            pub_date=datetime.datetime(1988, 5, 15),
            reporter=self.r, reporter_proxy=r_proxy,
        )
558
        # Retrieve the second article from the DB
559
        a2 = Article.objects.get(headline='Another article')
560
        a2.reporter_proxy_id = 30
561 562
        with self.assertRaises(IntegrityError):
            a2.save()
563 564 565

    def test_disable_constraint_checks_manually(self):
        """
566 567
        When constraint checks are disabled, should be able to write bad data
        without IntegrityErrors.
568
        """
569
        with transaction.atomic():
570
            # Create an Article.
571
            Article.objects.create(
572 573 574 575
                headline="Test article",
                pub_date=datetime.datetime(2010, 9, 4),
                reporter=self.r,
            )
576
            # Retrieve it from the DB
577
            a = Article.objects.get(headline="Test article")
578 579 580 581 582 583 584
            a.reporter_id = 30
            try:
                connection.disable_constraint_checking()
                a.save()
                connection.enable_constraint_checking()
            except IntegrityError:
                self.fail("IntegrityError should not have occurred.")
585
            transaction.set_rollback(True)
586 587 588

    def test_disable_constraint_checks_context_manager(self):
        """
589 590
        When constraint checks are disabled (using context manager), should be
        able to write bad data without IntegrityErrors.
591
        """
592
        with transaction.atomic():
593
            # Create an Article.
594
            Article.objects.create(
595 596 597 598
                headline="Test article",
                pub_date=datetime.datetime(2010, 9, 4),
                reporter=self.r,
            )
599
            # Retrieve it from the DB
600
            a = Article.objects.get(headline="Test article")
601 602 603 604 605 606
            a.reporter_id = 30
            try:
                with connection.constraint_checks_disabled():
                    a.save()
            except IntegrityError:
                self.fail("IntegrityError should not have occurred.")
607
            transaction.set_rollback(True)
608 609 610 611 612

    def test_check_constraints(self):
        """
        Constraint checks should raise an IntegrityError when bad data is in the DB.
        """
613
        with transaction.atomic():
614
            # Create an Article.
615
            Article.objects.create(
616 617 618 619
                headline="Test article",
                pub_date=datetime.datetime(2010, 9, 4),
                reporter=self.r,
            )
620
            # Retrieve it from the DB
621
            a = Article.objects.get(headline="Test article")
622
            a.reporter_id = 30
623 624 625 626 627
            with connection.constraint_checks_disabled():
                a.save()
                with self.assertRaises(IntegrityError):
                    connection.check_constraints()
            transaction.set_rollback(True)
628 629


630 631 632
class ThreadTests(TransactionTestCase):

    available_apps = ['backends']
633 634 635

    def test_default_connection_thread_local(self):
        """
636 637
        The default connection (i.e. django.db.connection) is different for
        each thread (#17258).
638
        """
639 640 641
        # Map connections by id because connections with identical aliases
        # have the same hash.
        connections_dict = {}
642
        connection.cursor()
643
        connections_dict[id(connection)] = connection
644

645
        def runner():
646 647 648 649
            # Passing django.db.connection between threads doesn't work while
            # connections[DEFAULT_DB_ALIAS] does.
            from django.db import connections
            connection = connections[DEFAULT_DB_ALIAS]
650 651
            # Allow thread sharing so the connection can be closed by the
            # main thread.
652
            connection.inc_thread_sharing()
653
            connection.cursor()
654
            connections_dict[id(connection)] = connection
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
        try:
            for x in range(2):
                t = threading.Thread(target=runner)
                t.start()
                t.join()
            # Each created connection got different inner connection.
            self.assertEqual(len({conn.connection for conn in connections_dict.values()}), 3)
        finally:
            # Finish by closing the connections opened by the other threads
            # (the connection opened in the main thread will automatically be
            # closed on teardown).
            for conn in connections_dict.values():
                if conn is not connection:
                    if conn.allow_thread_sharing:
                        conn.close()
                        conn.dec_thread_sharing()
671 672 673

    def test_connections_thread_local(self):
        """
674
        The connections are different for each thread (#17258).
675
        """
676 677 678
        # Map connections by id because connections with identical aliases
        # have the same hash.
        connections_dict = {}
679
        for conn in connections.all():
680
            connections_dict[id(conn)] = conn
681

682 683 684
        def runner():
            from django.db import connections
            for conn in connections.all():
685 686
                # Allow thread sharing so the connection can be closed by the
                # main thread.
687
                conn.inc_thread_sharing()
688
                connections_dict[id(conn)] = conn
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703
        try:
            for x in range(2):
                t = threading.Thread(target=runner)
                t.start()
                t.join()
            self.assertEqual(len(connections_dict), 6)
        finally:
            # Finish by closing the connections opened by the other threads
            # (the connection opened in the main thread will automatically be
            # closed on teardown).
            for conn in connections_dict.values():
                if conn is not connection:
                    if conn.allow_thread_sharing:
                        conn.close()
                        conn.dec_thread_sharing()
704 705 706

    def test_pass_connection_between_threads(self):
        """
707
        A connection can be passed from one thread to the other (#17258).
708
        """
709
        Person.objects.create(first_name="John", last_name="Doe")
710 711 712 713 714 715

        def do_thread():
            def runner(main_thread_connection):
                from django.db import connections
                connections['default'] = main_thread_connection
                try:
716
                    Person.objects.get(first_name="John", last_name="Doe")
717
                except Exception as e:
718 719 720 721 722
                    exceptions.append(e)
            t = threading.Thread(target=runner, args=[connections['default']])
            t.start()
            t.join()

723
        # Without touching thread sharing, which should be False by default.
724 725 726
        exceptions = []
        do_thread()
        # Forbidden!
727
        self.assertIsInstance(exceptions[0], DatabaseError)
728

729 730 731 732 733 734 735 736 737
        # After calling inc_thread_sharing() on the connection.
        connections['default'].inc_thread_sharing()
        try:
            exceptions = []
            do_thread()
            # All good
            self.assertEqual(exceptions, [])
        finally:
            connections['default'].dec_thread_sharing()
738 739 740

    def test_closing_non_shared_connections(self):
        """
741 742
        A connection that is not explicitly shareable cannot be closed by
        another thread (#17258).
743 744 745
        """
        # First, without explicitly enabling the connection for sharing.
        exceptions = set()
746

747 748 749 750
        def runner1():
            def runner2(other_thread_connection):
                try:
                    other_thread_connection.close()
751
                except DatabaseError as e:
752 753 754 755 756 757 758 759 760 761 762 763
                    exceptions.add(e)
            t2 = threading.Thread(target=runner2, args=[connections['default']])
            t2.start()
            t2.join()
        t1 = threading.Thread(target=runner1)
        t1.start()
        t1.join()
        # The exception was raised
        self.assertEqual(len(exceptions), 1)

        # Then, with explicitly enabling the connection for sharing.
        exceptions = set()
764

765 766 767 768
        def runner1():
            def runner2(other_thread_connection):
                try:
                    other_thread_connection.close()
769
                except DatabaseError as e:
770 771
                    exceptions.add(e)
            # Enable thread sharing
772 773 774 775 776 777 778
            connections['default'].inc_thread_sharing()
            try:
                t2 = threading.Thread(target=runner2, args=[connections['default']])
                t2.start()
                t2.join()
            finally:
                connections['default'].dec_thread_sharing()
779 780 781 782
        t1 = threading.Thread(target=runner1)
        t1.start()
        t1.join()
        # No exception was raised
783 784
        self.assertEqual(len(exceptions), 0)

785 786 787 788 789 790 791 792 793 794 795 796 797 798
    def test_thread_sharing_count(self):
        self.assertIs(connection.allow_thread_sharing, False)
        connection.inc_thread_sharing()
        self.assertIs(connection.allow_thread_sharing, True)
        connection.inc_thread_sharing()
        self.assertIs(connection.allow_thread_sharing, True)
        connection.dec_thread_sharing()
        self.assertIs(connection.allow_thread_sharing, True)
        connection.dec_thread_sharing()
        self.assertIs(connection.allow_thread_sharing, False)
        msg = 'Cannot decrement the thread sharing count below zero.'
        with self.assertRaisesMessage(RuntimeError, msg):
            connection.dec_thread_sharing()

799

800 801 802
class MySQLPKZeroTests(TestCase):
    """
    Zero as id for AutoField should raise exception in MySQL, because MySQL
803
    does not allow zero for autoincrement primary key.
804
    """
805
    @skipIfDBFeature('allows_auto_pk_0')
806 807
    def test_zero_as_autoval(self):
        with self.assertRaises(ValueError):
808
            Square.objects.create(id=0, root=0, square=1)
809 810


811
class DBConstraintTestCase(TestCase):
812

813
    def test_can_reference_existent(self):
814 815
        obj = Object.objects.create()
        ref = ObjectReference.objects.create(obj=obj)
816 817
        self.assertEqual(ref.obj, obj)

818
        ref = ObjectReference.objects.get(obj=obj)
819 820
        self.assertEqual(ref.obj, obj)

821
    def test_can_reference_non_existent(self):
822 823 824
        self.assertFalse(Object.objects.filter(id=12345).exists())
        ref = ObjectReference.objects.create(obj_id=12345)
        ref_new = ObjectReference.objects.get(obj_id=12345)
825 826
        self.assertEqual(ref, ref_new)

827
        with self.assertRaises(Object.DoesNotExist):
828
            ref.obj
829 830

    def test_many_to_many(self):
831
        obj = Object.objects.create()
832
        obj.related_objects.create()
833
        self.assertEqual(Object.objects.count(), 2)
834 835
        self.assertEqual(obj.related_objects.count(), 1)

836
        intermediary_model = Object._meta.get_field("related_objects").remote_field.through
837 838 839
        intermediary_model.objects.create(from_object_id=obj.id, to_object_id=12345)
        self.assertEqual(obj.related_objects.count(), 1)
        self.assertEqual(intermediary_model.objects.count(), 2)