1# pysqlite2/test/dbapi.py: tests for DB-API compliance
2#
3# Copyright (C) 2004-2010 Gerhard Häring <[email protected]>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import contextlib
24import os
25import sqlite3 as sqlite
26import subprocess
27import sys
28import threading
29import unittest
30import urllib.parse
31
32from test.support import (
33    SHORT_TIMEOUT, bigmemtest, check_disallow_instantiation, requires_subprocess,
34    is_emscripten, is_wasi
35)
36from test.support import threading_helper
37from _testcapi import INT_MAX, ULLONG_MAX
38from os import SEEK_SET, SEEK_CUR, SEEK_END
39from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath
40
41
42# Helper for temporary memory databases
43def memory_database(*args, **kwargs):
44    cx = sqlite.connect(":memory:", *args, **kwargs)
45    return contextlib.closing(cx)
46
47
48# Temporarily limit a database connection parameter
49@contextlib.contextmanager
50def cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128):
51    try:
52        _prev = cx.setlimit(category, limit)
53        yield limit
54    finally:
55        cx.setlimit(category, _prev)
56
57
58class ModuleTests(unittest.TestCase):
59    def test_api_level(self):
60        self.assertEqual(sqlite.apilevel, "2.0",
61                         "apilevel is %s, should be 2.0" % sqlite.apilevel)
62
63    def test_thread_safety(self):
64        self.assertIn(sqlite.threadsafety, {0, 1, 3},
65                      "threadsafety is %d, should be 0, 1 or 3" %
66                      sqlite.threadsafety)
67
68    def test_param_style(self):
69        self.assertEqual(sqlite.paramstyle, "qmark",
70                         "paramstyle is '%s', should be 'qmark'" %
71                         sqlite.paramstyle)
72
73    def test_warning(self):
74        self.assertTrue(issubclass(sqlite.Warning, Exception),
75                     "Warning is not a subclass of Exception")
76
77    def test_error(self):
78        self.assertTrue(issubclass(sqlite.Error, Exception),
79                        "Error is not a subclass of Exception")
80
81    def test_interface_error(self):
82        self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
83                        "InterfaceError is not a subclass of Error")
84
85    def test_database_error(self):
86        self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
87                        "DatabaseError is not a subclass of Error")
88
89    def test_data_error(self):
90        self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
91                        "DataError is not a subclass of DatabaseError")
92
93    def test_operational_error(self):
94        self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
95                        "OperationalError is not a subclass of DatabaseError")
96
97    def test_integrity_error(self):
98        self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
99                        "IntegrityError is not a subclass of DatabaseError")
100
101    def test_internal_error(self):
102        self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
103                        "InternalError is not a subclass of DatabaseError")
104
105    def test_programming_error(self):
106        self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
107                        "ProgrammingError is not a subclass of DatabaseError")
108
109    def test_not_supported_error(self):
110        self.assertTrue(issubclass(sqlite.NotSupportedError,
111                                   sqlite.DatabaseError),
112                        "NotSupportedError is not a subclass of DatabaseError")
113
114    def test_module_constants(self):
115        consts = [
116            "SQLITE_ABORT",
117            "SQLITE_ALTER_TABLE",
118            "SQLITE_ANALYZE",
119            "SQLITE_ATTACH",
120            "SQLITE_AUTH",
121            "SQLITE_BUSY",
122            "SQLITE_CANTOPEN",
123            "SQLITE_CONSTRAINT",
124            "SQLITE_CORRUPT",
125            "SQLITE_CREATE_INDEX",
126            "SQLITE_CREATE_TABLE",
127            "SQLITE_CREATE_TEMP_INDEX",
128            "SQLITE_CREATE_TEMP_TABLE",
129            "SQLITE_CREATE_TEMP_TRIGGER",
130            "SQLITE_CREATE_TEMP_VIEW",
131            "SQLITE_CREATE_TRIGGER",
132            "SQLITE_CREATE_VIEW",
133            "SQLITE_CREATE_VTABLE",
134            "SQLITE_DELETE",
135            "SQLITE_DENY",
136            "SQLITE_DETACH",
137            "SQLITE_DONE",
138            "SQLITE_DROP_INDEX",
139            "SQLITE_DROP_TABLE",
140            "SQLITE_DROP_TEMP_INDEX",
141            "SQLITE_DROP_TEMP_TABLE",
142            "SQLITE_DROP_TEMP_TRIGGER",
143            "SQLITE_DROP_TEMP_VIEW",
144            "SQLITE_DROP_TRIGGER",
145            "SQLITE_DROP_VIEW",
146            "SQLITE_DROP_VTABLE",
147            "SQLITE_EMPTY",
148            "SQLITE_ERROR",
149            "SQLITE_FORMAT",
150            "SQLITE_FULL",
151            "SQLITE_FUNCTION",
152            "SQLITE_IGNORE",
153            "SQLITE_INSERT",
154            "SQLITE_INTERNAL",
155            "SQLITE_INTERRUPT",
156            "SQLITE_IOERR",
157            "SQLITE_LOCKED",
158            "SQLITE_MISMATCH",
159            "SQLITE_MISUSE",
160            "SQLITE_NOLFS",
161            "SQLITE_NOMEM",
162            "SQLITE_NOTADB",
163            "SQLITE_NOTFOUND",
164            "SQLITE_OK",
165            "SQLITE_PERM",
166            "SQLITE_PRAGMA",
167            "SQLITE_PROTOCOL",
168            "SQLITE_RANGE",
169            "SQLITE_READ",
170            "SQLITE_READONLY",
171            "SQLITE_REINDEX",
172            "SQLITE_ROW",
173            "SQLITE_SAVEPOINT",
174            "SQLITE_SCHEMA",
175            "SQLITE_SELECT",
176            "SQLITE_TOOBIG",
177            "SQLITE_TRANSACTION",
178            "SQLITE_UPDATE",
179            # Run-time limit categories
180            "SQLITE_LIMIT_LENGTH",
181            "SQLITE_LIMIT_SQL_LENGTH",
182            "SQLITE_LIMIT_COLUMN",
183            "SQLITE_LIMIT_EXPR_DEPTH",
184            "SQLITE_LIMIT_COMPOUND_SELECT",
185            "SQLITE_LIMIT_VDBE_OP",
186            "SQLITE_LIMIT_FUNCTION_ARG",
187            "SQLITE_LIMIT_ATTACHED",
188            "SQLITE_LIMIT_LIKE_PATTERN_LENGTH",
189            "SQLITE_LIMIT_VARIABLE_NUMBER",
190            "SQLITE_LIMIT_TRIGGER_DEPTH",
191        ]
192        if sqlite.sqlite_version_info >= (3, 7, 17):
193            consts += ["SQLITE_NOTICE", "SQLITE_WARNING"]
194        if sqlite.sqlite_version_info >= (3, 8, 3):
195            consts.append("SQLITE_RECURSIVE")
196        if sqlite.sqlite_version_info >= (3, 8, 7):
197            consts.append("SQLITE_LIMIT_WORKER_THREADS")
198        consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"]
199        # Extended result codes
200        consts += [
201            "SQLITE_ABORT_ROLLBACK",
202            "SQLITE_BUSY_RECOVERY",
203            "SQLITE_CANTOPEN_FULLPATH",
204            "SQLITE_CANTOPEN_ISDIR",
205            "SQLITE_CANTOPEN_NOTEMPDIR",
206            "SQLITE_CORRUPT_VTAB",
207            "SQLITE_IOERR_ACCESS",
208            "SQLITE_IOERR_BLOCKED",
209            "SQLITE_IOERR_CHECKRESERVEDLOCK",
210            "SQLITE_IOERR_CLOSE",
211            "SQLITE_IOERR_DELETE",
212            "SQLITE_IOERR_DELETE_NOENT",
213            "SQLITE_IOERR_DIR_CLOSE",
214            "SQLITE_IOERR_DIR_FSYNC",
215            "SQLITE_IOERR_FSTAT",
216            "SQLITE_IOERR_FSYNC",
217            "SQLITE_IOERR_LOCK",
218            "SQLITE_IOERR_NOMEM",
219            "SQLITE_IOERR_RDLOCK",
220            "SQLITE_IOERR_READ",
221            "SQLITE_IOERR_SEEK",
222            "SQLITE_IOERR_SHMLOCK",
223            "SQLITE_IOERR_SHMMAP",
224            "SQLITE_IOERR_SHMOPEN",
225            "SQLITE_IOERR_SHMSIZE",
226            "SQLITE_IOERR_SHORT_READ",
227            "SQLITE_IOERR_TRUNCATE",
228            "SQLITE_IOERR_UNLOCK",
229            "SQLITE_IOERR_WRITE",
230            "SQLITE_LOCKED_SHAREDCACHE",
231            "SQLITE_READONLY_CANTLOCK",
232            "SQLITE_READONLY_RECOVERY",
233        ]
234        if sqlite.sqlite_version_info >= (3, 7, 16):
235            consts += [
236                "SQLITE_CONSTRAINT_CHECK",
237                "SQLITE_CONSTRAINT_COMMITHOOK",
238                "SQLITE_CONSTRAINT_FOREIGNKEY",
239                "SQLITE_CONSTRAINT_FUNCTION",
240                "SQLITE_CONSTRAINT_NOTNULL",
241                "SQLITE_CONSTRAINT_PRIMARYKEY",
242                "SQLITE_CONSTRAINT_TRIGGER",
243                "SQLITE_CONSTRAINT_UNIQUE",
244                "SQLITE_CONSTRAINT_VTAB",
245                "SQLITE_READONLY_ROLLBACK",
246            ]
247        if sqlite.sqlite_version_info >= (3, 7, 17):
248            consts += [
249                "SQLITE_IOERR_MMAP",
250                "SQLITE_NOTICE_RECOVER_ROLLBACK",
251                "SQLITE_NOTICE_RECOVER_WAL",
252            ]
253        if sqlite.sqlite_version_info >= (3, 8, 0):
254            consts += [
255                "SQLITE_BUSY_SNAPSHOT",
256                "SQLITE_IOERR_GETTEMPPATH",
257                "SQLITE_WARNING_AUTOINDEX",
258            ]
259        if sqlite.sqlite_version_info >= (3, 8, 1):
260            consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"]
261        if sqlite.sqlite_version_info >= (3, 8, 2):
262            consts.append("SQLITE_CONSTRAINT_ROWID")
263        if sqlite.sqlite_version_info >= (3, 8, 3):
264            consts.append("SQLITE_READONLY_DBMOVED")
265        if sqlite.sqlite_version_info >= (3, 8, 7):
266            consts.append("SQLITE_AUTH_USER")
267        if sqlite.sqlite_version_info >= (3, 9, 0):
268            consts.append("SQLITE_IOERR_VNODE")
269        if sqlite.sqlite_version_info >= (3, 10, 0):
270            consts.append("SQLITE_IOERR_AUTH")
271        if sqlite.sqlite_version_info >= (3, 14, 1):
272            consts.append("SQLITE_OK_LOAD_PERMANENTLY")
273        if sqlite.sqlite_version_info >= (3, 21, 0):
274            consts += [
275                "SQLITE_IOERR_BEGIN_ATOMIC",
276                "SQLITE_IOERR_COMMIT_ATOMIC",
277                "SQLITE_IOERR_ROLLBACK_ATOMIC",
278            ]
279        if sqlite.sqlite_version_info >= (3, 22, 0):
280            consts += [
281                "SQLITE_ERROR_MISSING_COLLSEQ",
282                "SQLITE_ERROR_RETRY",
283                "SQLITE_READONLY_CANTINIT",
284                "SQLITE_READONLY_DIRECTORY",
285            ]
286        if sqlite.sqlite_version_info >= (3, 24, 0):
287            consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"]
288        if sqlite.sqlite_version_info >= (3, 25, 0):
289            consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"]
290        if sqlite.sqlite_version_info >= (3, 31, 0):
291            consts += [
292                "SQLITE_CANTOPEN_SYMLINK",
293                "SQLITE_CONSTRAINT_PINNED",
294                "SQLITE_OK_SYMLINK",
295            ]
296        if sqlite.sqlite_version_info >= (3, 32, 0):
297            consts += [
298                "SQLITE_BUSY_TIMEOUT",
299                "SQLITE_CORRUPT_INDEX",
300                "SQLITE_IOERR_DATA",
301            ]
302        if sqlite.sqlite_version_info >= (3, 34, 0):
303            consts.append("SQLITE_IOERR_CORRUPTFS")
304        for const in consts:
305            with self.subTest(const=const):
306                self.assertTrue(hasattr(sqlite, const))
307
308    def test_error_code_on_exception(self):
309        err_msg = "unable to open database file"
310        if sys.platform.startswith("win"):
311            err_code = sqlite.SQLITE_CANTOPEN_ISDIR
312        else:
313            err_code = sqlite.SQLITE_CANTOPEN
314
315        with temp_dir() as db:
316            with self.assertRaisesRegex(sqlite.Error, err_msg) as cm:
317                sqlite.connect(db)
318            e = cm.exception
319            self.assertEqual(e.sqlite_errorcode, err_code)
320            self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN"))
321
322    @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16),
323                     "Requires SQLite 3.7.16 or newer")
324    def test_extended_error_code_on_exception(self):
325        with memory_database() as con:
326            with con:
327                con.execute("create table t(t integer check(t > 0))")
328            errmsg = "constraint failed"
329            with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm:
330                con.execute("insert into t values(-1)")
331            exc = cm.exception
332            self.assertEqual(exc.sqlite_errorcode,
333                             sqlite.SQLITE_CONSTRAINT_CHECK)
334            self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK")
335
336    # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise
337    # OperationalError on some buildbots.
338    @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS")
339    def test_shared_cache_deprecated(self):
340        for enable in (True, False):
341            with self.assertWarns(DeprecationWarning) as cm:
342                sqlite.enable_shared_cache(enable)
343            self.assertIn("dbapi.py", cm.filename)
344
345    def test_disallow_instantiation(self):
346        cx = sqlite.connect(":memory:")
347        check_disallow_instantiation(self, type(cx("select 1")))
348        check_disallow_instantiation(self, sqlite.Blob)
349
350    def test_complete_statement(self):
351        self.assertFalse(sqlite.complete_statement("select t"))
352        self.assertTrue(sqlite.complete_statement("create table t(t);"))
353
354
355class ConnectionTests(unittest.TestCase):
356
357    def setUp(self):
358        self.cx = sqlite.connect(":memory:")
359        cu = self.cx.cursor()
360        cu.execute("create table test(id integer primary key, name text)")
361        cu.execute("insert into test(name) values (?)", ("foo",))
362
363    def tearDown(self):
364        self.cx.close()
365
366    def test_commit(self):
367        self.cx.commit()
368
369    def test_commit_after_no_changes(self):
370        """
371        A commit should also work when no changes were made to the database.
372        """
373        self.cx.commit()
374        self.cx.commit()
375
376    def test_rollback(self):
377        self.cx.rollback()
378
379    def test_rollback_after_no_changes(self):
380        """
381        A rollback should also work when no changes were made to the database.
382        """
383        self.cx.rollback()
384        self.cx.rollback()
385
386    def test_cursor(self):
387        cu = self.cx.cursor()
388
389    def test_failed_open(self):
390        YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
391        with self.assertRaises(sqlite.OperationalError):
392            sqlite.connect(YOU_CANNOT_OPEN_THIS)
393
394    def test_close(self):
395        self.cx.close()
396
397    def test_use_after_close(self):
398        sql = "select 1"
399        cu = self.cx.cursor()
400        res = cu.execute(sql)
401        self.cx.close()
402        self.assertRaises(sqlite.ProgrammingError, res.fetchall)
403        self.assertRaises(sqlite.ProgrammingError, cu.execute, sql)
404        self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, [])
405        self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql)
406        self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql)
407        self.assertRaises(sqlite.ProgrammingError,
408                          self.cx.executemany, sql, [])
409        self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql)
410        self.assertRaises(sqlite.ProgrammingError,
411                          self.cx.create_function, "t", 1, lambda x: x)
412        self.assertRaises(sqlite.ProgrammingError, self.cx.cursor)
413        with self.assertRaises(sqlite.ProgrammingError):
414            with self.cx:
415                pass
416
417    def test_exceptions(self):
418        # Optional DB-API extension.
419        self.assertEqual(self.cx.Warning, sqlite.Warning)
420        self.assertEqual(self.cx.Error, sqlite.Error)
421        self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
422        self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
423        self.assertEqual(self.cx.DataError, sqlite.DataError)
424        self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
425        self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
426        self.assertEqual(self.cx.InternalError, sqlite.InternalError)
427        self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
428        self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
429
430    def test_in_transaction(self):
431        # Can't use db from setUp because we want to test initial state.
432        cx = sqlite.connect(":memory:")
433        cu = cx.cursor()
434        self.assertEqual(cx.in_transaction, False)
435        cu.execute("create table transactiontest(id integer primary key, name text)")
436        self.assertEqual(cx.in_transaction, False)
437        cu.execute("insert into transactiontest(name) values (?)", ("foo",))
438        self.assertEqual(cx.in_transaction, True)
439        cu.execute("select name from transactiontest where name=?", ["foo"])
440        row = cu.fetchone()
441        self.assertEqual(cx.in_transaction, True)
442        cx.commit()
443        self.assertEqual(cx.in_transaction, False)
444        cu.execute("select name from transactiontest where name=?", ["foo"])
445        row = cu.fetchone()
446        self.assertEqual(cx.in_transaction, False)
447
448    def test_in_transaction_ro(self):
449        with self.assertRaises(AttributeError):
450            self.cx.in_transaction = True
451
452    def test_connection_exceptions(self):
453        exceptions = [
454            "DataError",
455            "DatabaseError",
456            "Error",
457            "IntegrityError",
458            "InterfaceError",
459            "NotSupportedError",
460            "OperationalError",
461            "ProgrammingError",
462            "Warning",
463        ]
464        for exc in exceptions:
465            with self.subTest(exc=exc):
466                self.assertTrue(hasattr(self.cx, exc))
467                self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc))
468
469    def test_interrupt_on_closed_db(self):
470        cx = sqlite.connect(":memory:")
471        cx.close()
472        with self.assertRaises(sqlite.ProgrammingError):
473            cx.interrupt()
474
475    def test_interrupt(self):
476        self.assertIsNone(self.cx.interrupt())
477
478    def test_drop_unused_refs(self):
479        for n in range(500):
480            cu = self.cx.execute(f"select {n}")
481            self.assertEqual(cu.fetchone()[0], n)
482
483    def test_connection_limits(self):
484        category = sqlite.SQLITE_LIMIT_SQL_LENGTH
485        saved_limit = self.cx.getlimit(category)
486        try:
487            new_limit = 10
488            prev_limit = self.cx.setlimit(category, new_limit)
489            self.assertEqual(saved_limit, prev_limit)
490            self.assertEqual(self.cx.getlimit(category), new_limit)
491            msg = "query string is too large"
492            self.assertRaisesRegex(sqlite.DataError, msg,
493                                   self.cx.execute, "select 1 as '16'")
494        finally:  # restore saved limit
495            self.cx.setlimit(category, saved_limit)
496
497    def test_connection_bad_limit_category(self):
498        msg = "'category' is out of bounds"
499        cat = 1111
500        self.assertRaisesRegex(sqlite.ProgrammingError, msg,
501                               self.cx.getlimit, cat)
502        self.assertRaisesRegex(sqlite.ProgrammingError, msg,
503                               self.cx.setlimit, cat, 0)
504
505    def test_connection_init_bad_isolation_level(self):
506        msg = (
507            "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or "
508            "'EXCLUSIVE'"
509        )
510        levels = (
511            "BOGUS",
512            " ",
513            "DEFERRE",
514            "IMMEDIAT",
515            "EXCLUSIV",
516            "DEFERREDS",
517            "IMMEDIATES",
518            "EXCLUSIVES",
519        )
520        for level in levels:
521            with self.subTest(level=level):
522                with self.assertRaisesRegex(ValueError, msg):
523                    memory_database(isolation_level=level)
524                with memory_database() as cx:
525                    with self.assertRaisesRegex(ValueError, msg):
526                        cx.isolation_level = level
527                    # Check that the default level is not changed
528                    self.assertEqual(cx.isolation_level, "")
529
530    def test_connection_init_good_isolation_levels(self):
531        for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None):
532            with self.subTest(level=level):
533                with memory_database(isolation_level=level) as cx:
534                    self.assertEqual(cx.isolation_level, level)
535                with memory_database() as cx:
536                    self.assertEqual(cx.isolation_level, "")
537                    cx.isolation_level = level
538                    self.assertEqual(cx.isolation_level, level)
539
540    def test_connection_reinit(self):
541        db = ":memory:"
542        cx = sqlite.connect(db)
543        cx.text_factory = bytes
544        cx.row_factory = sqlite.Row
545        cu = cx.cursor()
546        cu.execute("create table foo (bar)")
547        cu.executemany("insert into foo (bar) values (?)",
548                       ((str(v),) for v in range(4)))
549        cu.execute("select bar from foo")
550
551        rows = [r for r in cu.fetchmany(2)]
552        self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
553        self.assertEqual([r[0] for r in rows], [b"0", b"1"])
554
555        cx.__init__(db)
556        cx.execute("create table foo (bar)")
557        cx.executemany("insert into foo (bar) values (?)",
558                       ((v,) for v in ("a", "b", "c", "d")))
559
560        # This uses the old database, old row factory, but new text factory
561        rows = [r for r in cu.fetchall()]
562        self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows))
563        self.assertEqual([r[0] for r in rows], ["2", "3"])
564
565    def test_connection_bad_reinit(self):
566        cx = sqlite.connect(":memory:")
567        with cx:
568            cx.execute("create table t(t)")
569        with temp_dir() as db:
570            self.assertRaisesRegex(sqlite.OperationalError,
571                                   "unable to open database file",
572                                   cx.__init__, db)
573            self.assertRaisesRegex(sqlite.ProgrammingError,
574                                   "Base Connection.__init__ not called",
575                                   cx.executemany, "insert into t values(?)",
576                                   ((v,) for v in range(3)))
577
578
579class UninitialisedConnectionTests(unittest.TestCase):
580    def setUp(self):
581        self.cx = sqlite.Connection.__new__(sqlite.Connection)
582
583    def test_uninit_operations(self):
584        funcs = (
585            lambda: self.cx.isolation_level,
586            lambda: self.cx.total_changes,
587            lambda: self.cx.in_transaction,
588            lambda: self.cx.iterdump(),
589            lambda: self.cx.cursor(),
590            lambda: self.cx.close(),
591        )
592        for func in funcs:
593            with self.subTest(func=func):
594                self.assertRaisesRegex(sqlite.ProgrammingError,
595                                       "Base Connection.__init__ not called",
596                                       func)
597
598
599@unittest.skipUnless(hasattr(sqlite.Connection, "serialize"),
600                     "Needs SQLite serialize API")
601class SerializeTests(unittest.TestCase):
602    def test_serialize_deserialize(self):
603        with memory_database() as cx:
604            with cx:
605                cx.execute("create table t(t)")
606            data = cx.serialize()
607
608            # Remove test table, verify that it was removed.
609            with cx:
610                cx.execute("drop table t")
611            regex = "no such table"
612            with self.assertRaisesRegex(sqlite.OperationalError, regex):
613                cx.execute("select t from t")
614
615            # Deserialize and verify that test table is restored.
616            cx.deserialize(data)
617            cx.execute("select t from t")
618
619    def test_deserialize_wrong_args(self):
620        dataset = (
621            (BufferError, memoryview(b"blob")[::2]),
622            (TypeError, []),
623            (TypeError, 1),
624            (TypeError, None),
625        )
626        for exc, arg in dataset:
627            with self.subTest(exc=exc, arg=arg):
628                with memory_database() as cx:
629                    self.assertRaises(exc, cx.deserialize, arg)
630
631    def test_deserialize_corrupt_database(self):
632        with memory_database() as cx:
633            regex = "file is not a database"
634            with self.assertRaisesRegex(sqlite.DatabaseError, regex):
635                cx.deserialize(b"\0\1\3")
636                # SQLite does not generate an error until you try to query the
637                # deserialized database.
638                cx.execute("create table fail(f)")
639
640    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
641    @bigmemtest(size=2**63, memuse=3, dry_run=False)
642    def test_deserialize_too_much_data_64bit(self):
643        with memory_database() as cx:
644            with self.assertRaisesRegex(OverflowError, "'data' is too large"):
645                cx.deserialize(b"b" * size)
646
647
648class OpenTests(unittest.TestCase):
649    _sql = "create table test(id integer)"
650
651    def test_open_with_path_like_object(self):
652        """ Checks that we can successfully connect to a database using an object that
653            is PathLike, i.e. has __fspath__(). """
654        path = FakePath(TESTFN)
655        self.addCleanup(unlink, path)
656        self.assertFalse(os.path.exists(path))
657        with contextlib.closing(sqlite.connect(path)) as cx:
658            self.assertTrue(os.path.exists(path))
659            cx.execute(self._sql)
660
661    @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
662    @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
663    @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
664    @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
665    def test_open_with_undecodable_path(self):
666        path = TESTFN_UNDECODABLE
667        self.addCleanup(unlink, path)
668        self.assertFalse(os.path.exists(path))
669        with contextlib.closing(sqlite.connect(path)) as cx:
670            self.assertTrue(os.path.exists(path))
671            cx.execute(self._sql)
672
673    def test_open_uri(self):
674        path = TESTFN
675        self.addCleanup(unlink, path)
676        uri = "file:" + urllib.parse.quote(os.fsencode(path))
677        self.assertFalse(os.path.exists(path))
678        with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
679            self.assertTrue(os.path.exists(path))
680            cx.execute(self._sql)
681
682    def test_open_unquoted_uri(self):
683        path = TESTFN
684        self.addCleanup(unlink, path)
685        uri = "file:" + path
686        self.assertFalse(os.path.exists(path))
687        with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
688            self.assertTrue(os.path.exists(path))
689            cx.execute(self._sql)
690
691    def test_open_uri_readonly(self):
692        path = TESTFN
693        self.addCleanup(unlink, path)
694        uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro"
695        self.assertFalse(os.path.exists(path))
696        # Cannot create new DB
697        with self.assertRaises(sqlite.OperationalError):
698            sqlite.connect(uri, uri=True)
699        self.assertFalse(os.path.exists(path))
700        sqlite.connect(path).close()
701        self.assertTrue(os.path.exists(path))
702        # Cannot modify new DB
703        with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
704            with self.assertRaises(sqlite.OperationalError):
705                cx.execute(self._sql)
706
707    @unittest.skipIf(sys.platform == "win32", "skipped on Windows")
708    @unittest.skipIf(sys.platform == "darwin", "skipped on macOS")
709    @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI")
710    @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths")
711    def test_open_undecodable_uri(self):
712        path = TESTFN_UNDECODABLE
713        self.addCleanup(unlink, path)
714        uri = "file:" + urllib.parse.quote(path)
715        self.assertFalse(os.path.exists(path))
716        with contextlib.closing(sqlite.connect(uri, uri=True)) as cx:
717            self.assertTrue(os.path.exists(path))
718            cx.execute(self._sql)
719
720    def test_factory_database_arg(self):
721        def factory(database, *args, **kwargs):
722            nonlocal database_arg
723            database_arg = database
724            return sqlite.Connection(":memory:", *args, **kwargs)
725
726        for database in (TESTFN, os.fsencode(TESTFN),
727                         FakePath(TESTFN), FakePath(os.fsencode(TESTFN))):
728            database_arg = None
729            sqlite.connect(database, factory=factory).close()
730            self.assertEqual(database_arg, database)
731
732    def test_database_keyword(self):
733        with contextlib.closing(sqlite.connect(database=":memory:")) as cx:
734            self.assertEqual(type(cx), sqlite.Connection)
735
736
737class CursorTests(unittest.TestCase):
738    def setUp(self):
739        self.cx = sqlite.connect(":memory:")
740        self.cu = self.cx.cursor()
741        self.cu.execute(
742            "create table test(id integer primary key, name text, "
743            "income number, unique_test text unique)"
744        )
745        self.cu.execute("insert into test(name) values (?)", ("foo",))
746
747    def tearDown(self):
748        self.cu.close()
749        self.cx.close()
750
751    def test_execute_no_args(self):
752        self.cu.execute("delete from test")
753
754    def test_execute_illegal_sql(self):
755        with self.assertRaises(sqlite.OperationalError):
756            self.cu.execute("select asdf")
757
758    def test_execute_multiple_statements(self):
759        msg = "You can only execute one statement at a time"
760        dataset = (
761            "select 1; select 2",
762            "select 1; // c++ comments are not allowed",
763            "select 1; *not a comment",
764            "select 1; -*not a comment",
765            "select 1; /* */ a",
766            "select 1; /**/a",
767            "select 1; -",
768            "select 1; /",
769            "select 1; -\n- select 2",
770            """select 1;
771               -- comment
772               select 2
773            """,
774        )
775        for query in dataset:
776            with self.subTest(query=query):
777                with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
778                    self.cu.execute(query)
779
780    def test_execute_with_appended_comments(self):
781        dataset = (
782            "select 1; -- foo bar",
783            "select 1; --",
784            "select 1; /*",  # Unclosed comments ending in \0 are skipped.
785            """
786            select 5+4;
787
788            /*
789            foo
790            */
791            """,
792        )
793        for query in dataset:
794            with self.subTest(query=query):
795                self.cu.execute(query)
796
797    def test_execute_wrong_sql_arg(self):
798        with self.assertRaises(TypeError):
799            self.cu.execute(42)
800
801    def test_execute_arg_int(self):
802        self.cu.execute("insert into test(id) values (?)", (42,))
803
804    def test_execute_arg_float(self):
805        self.cu.execute("insert into test(income) values (?)", (2500.32,))
806
807    def test_execute_arg_string(self):
808        self.cu.execute("insert into test(name) values (?)", ("Hugo",))
809
810    def test_execute_arg_string_with_zero_byte(self):
811        self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
812
813        self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
814        row = self.cu.fetchone()
815        self.assertEqual(row[0], "Hu\x00go")
816
817    def test_execute_non_iterable(self):
818        with self.assertRaises(sqlite.ProgrammingError) as cm:
819            self.cu.execute("insert into test(id) values (?)", 42)
820        self.assertEqual(str(cm.exception), 'parameters are of unsupported type')
821
822    def test_execute_wrong_no_of_args1(self):
823        # too many parameters
824        with self.assertRaises(sqlite.ProgrammingError):
825            self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
826
827    def test_execute_wrong_no_of_args2(self):
828        # too little parameters
829        with self.assertRaises(sqlite.ProgrammingError):
830            self.cu.execute("insert into test(id) values (?)")
831
832    def test_execute_wrong_no_of_args3(self):
833        # no parameters, parameters are needed
834        with self.assertRaises(sqlite.ProgrammingError):
835            self.cu.execute("insert into test(id) values (?)")
836
837    def test_execute_param_list(self):
838        self.cu.execute("insert into test(name) values ('foo')")
839        self.cu.execute("select name from test where name=?", ["foo"])
840        row = self.cu.fetchone()
841        self.assertEqual(row[0], "foo")
842
843    def test_execute_param_sequence(self):
844        class L:
845            def __len__(self):
846                return 1
847            def __getitem__(self, x):
848                assert x == 0
849                return "foo"
850
851        self.cu.execute("insert into test(name) values ('foo')")
852        self.cu.execute("select name from test where name=?", L())
853        row = self.cu.fetchone()
854        self.assertEqual(row[0], "foo")
855
856    def test_execute_param_sequence_bad_len(self):
857        # Issue41662: Error in __len__() was overridden with ProgrammingError.
858        class L:
859            def __len__(self):
860                1/0
861            def __getitem__(slf, x):
862                raise AssertionError
863
864        self.cu.execute("insert into test(name) values ('foo')")
865        with self.assertRaises(ZeroDivisionError):
866            self.cu.execute("select name from test where name=?", L())
867
868    def test_execute_too_many_params(self):
869        category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER
870        msg = "too many SQL variables"
871        with cx_limit(self.cx, category=category, limit=1):
872            self.cu.execute("select * from test where id=?", (1,))
873            with self.assertRaisesRegex(sqlite.OperationalError, msg):
874                self.cu.execute("select * from test where id!=? and id!=?",
875                                (1, 2))
876
877    def test_execute_dict_mapping(self):
878        self.cu.execute("insert into test(name) values ('foo')")
879        self.cu.execute("select name from test where name=:name", {"name": "foo"})
880        row = self.cu.fetchone()
881        self.assertEqual(row[0], "foo")
882
883    def test_execute_dict_mapping_mapping(self):
884        class D(dict):
885            def __missing__(self, key):
886                return "foo"
887
888        self.cu.execute("insert into test(name) values ('foo')")
889        self.cu.execute("select name from test where name=:name", D())
890        row = self.cu.fetchone()
891        self.assertEqual(row[0], "foo")
892
893    def test_execute_dict_mapping_too_little_args(self):
894        self.cu.execute("insert into test(name) values ('foo')")
895        with self.assertRaises(sqlite.ProgrammingError):
896            self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
897
898    def test_execute_dict_mapping_no_args(self):
899        self.cu.execute("insert into test(name) values ('foo')")
900        with self.assertRaises(sqlite.ProgrammingError):
901            self.cu.execute("select name from test where name=:name")
902
903    def test_execute_dict_mapping_unnamed(self):
904        self.cu.execute("insert into test(name) values ('foo')")
905        with self.assertRaises(sqlite.ProgrammingError):
906            self.cu.execute("select name from test where name=?", {"name": "foo"})
907
908    def test_close(self):
909        self.cu.close()
910
911    def test_rowcount_execute(self):
912        self.cu.execute("delete from test")
913        self.cu.execute("insert into test(name) values ('foo')")
914        self.cu.execute("insert into test(name) values ('foo')")
915        self.cu.execute("update test set name='bar'")
916        self.assertEqual(self.cu.rowcount, 2)
917
918    def test_rowcount_select(self):
919        """
920        pysqlite does not know the rowcount of SELECT statements, because we
921        don't fetch all rows after executing the select statement. The rowcount
922        has thus to be -1.
923        """
924        self.cu.execute("select 5 union select 6")
925        self.assertEqual(self.cu.rowcount, -1)
926
927    def test_rowcount_executemany(self):
928        self.cu.execute("delete from test")
929        self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
930        self.assertEqual(self.cu.rowcount, 3)
931
932    @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0),
933                     "Requires SQLite 3.35.0 or newer")
934    def test_rowcount_update_returning(self):
935        # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries
936        self.cu.execute("update test set name='bar' where name='foo' returning 1")
937        self.assertEqual(self.cu.fetchone()[0], 1)
938        self.assertEqual(self.cu.rowcount, 1)
939
940    def test_rowcount_prefixed_with_comment(self):
941        # gh-79579: rowcount is updated even if query is prefixed with comments
942        self.cu.execute("""
943            -- foo
944            insert into test(name) values ('foo'), ('foo')
945        """)
946        self.assertEqual(self.cu.rowcount, 2)
947        self.cu.execute("""
948            /* -- messy *r /* /* ** *- *--
949            */
950            /* one more */ insert into test(name) values ('messy')
951        """)
952        self.assertEqual(self.cu.rowcount, 1)
953        self.cu.execute("/* bar */ update test set name='bar' where name='foo'")
954        self.assertEqual(self.cu.rowcount, 3)
955
956    def test_rowcount_vaccuum(self):
957        data = ((1,), (2,), (3,))
958        self.cu.executemany("insert into test(income) values(?)", data)
959        self.assertEqual(self.cu.rowcount, 3)
960        self.cx.commit()
961        self.cu.execute("vacuum")
962        self.assertEqual(self.cu.rowcount, -1)
963
964    def test_total_changes(self):
965        self.cu.execute("insert into test(name) values ('foo')")
966        self.cu.execute("insert into test(name) values ('foo')")
967        self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value')
968
969    # Checks for executemany:
970    # Sequences are required by the DB-API, iterators
971    # enhancements in pysqlite.
972
973    def test_execute_many_sequence(self):
974        self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
975
976    def test_execute_many_iterator(self):
977        class MyIter:
978            def __init__(self):
979                self.value = 5
980
981            def __iter__(self):
982                return self
983
984            def __next__(self):
985                if self.value == 10:
986                    raise StopIteration
987                else:
988                    self.value += 1
989                    return (self.value,)
990
991        self.cu.executemany("insert into test(income) values (?)", MyIter())
992
993    def test_execute_many_generator(self):
994        def mygen():
995            for i in range(5):
996                yield (i,)
997
998        self.cu.executemany("insert into test(income) values (?)", mygen())
999
1000    def test_execute_many_wrong_sql_arg(self):
1001        with self.assertRaises(TypeError):
1002            self.cu.executemany(42, [(3,)])
1003
1004    def test_execute_many_select(self):
1005        with self.assertRaises(sqlite.ProgrammingError):
1006            self.cu.executemany("select ?", [(3,)])
1007
1008    def test_execute_many_not_iterable(self):
1009        with self.assertRaises(TypeError):
1010            self.cu.executemany("insert into test(income) values (?)", 42)
1011
1012    def test_fetch_iter(self):
1013        # Optional DB-API extension.
1014        self.cu.execute("delete from test")
1015        self.cu.execute("insert into test(id) values (?)", (5,))
1016        self.cu.execute("insert into test(id) values (?)", (6,))
1017        self.cu.execute("select id from test order by id")
1018        lst = []
1019        for row in self.cu:
1020            lst.append(row[0])
1021        self.assertEqual(lst[0], 5)
1022        self.assertEqual(lst[1], 6)
1023
1024    def test_fetchone(self):
1025        self.cu.execute("select name from test")
1026        row = self.cu.fetchone()
1027        self.assertEqual(row[0], "foo")
1028        row = self.cu.fetchone()
1029        self.assertEqual(row, None)
1030
1031    def test_fetchone_no_statement(self):
1032        cur = self.cx.cursor()
1033        row = cur.fetchone()
1034        self.assertEqual(row, None)
1035
1036    def test_array_size(self):
1037        # must default to 1
1038        self.assertEqual(self.cu.arraysize, 1)
1039
1040        # now set to 2
1041        self.cu.arraysize = 2
1042
1043        # now make the query return 3 rows
1044        self.cu.execute("delete from test")
1045        self.cu.execute("insert into test(name) values ('A')")
1046        self.cu.execute("insert into test(name) values ('B')")
1047        self.cu.execute("insert into test(name) values ('C')")
1048        self.cu.execute("select name from test")
1049        res = self.cu.fetchmany()
1050
1051        self.assertEqual(len(res), 2)
1052
1053    def test_fetchmany(self):
1054        self.cu.execute("select name from test")
1055        res = self.cu.fetchmany(100)
1056        self.assertEqual(len(res), 1)
1057        res = self.cu.fetchmany(100)
1058        self.assertEqual(res, [])
1059
1060    def test_fetchmany_kw_arg(self):
1061        """Checks if fetchmany works with keyword arguments"""
1062        self.cu.execute("select name from test")
1063        res = self.cu.fetchmany(size=100)
1064        self.assertEqual(len(res), 1)
1065
1066    def test_fetchall(self):
1067        self.cu.execute("select name from test")
1068        res = self.cu.fetchall()
1069        self.assertEqual(len(res), 1)
1070        res = self.cu.fetchall()
1071        self.assertEqual(res, [])
1072
1073    def test_setinputsizes(self):
1074        self.cu.setinputsizes([3, 4, 5])
1075
1076    def test_setoutputsize(self):
1077        self.cu.setoutputsize(5, 0)
1078
1079    def test_setoutputsize_no_column(self):
1080        self.cu.setoutputsize(42)
1081
1082    def test_cursor_connection(self):
1083        # Optional DB-API extension.
1084        self.assertEqual(self.cu.connection, self.cx)
1085
1086    def test_wrong_cursor_callable(self):
1087        with self.assertRaises(TypeError):
1088            def f(): pass
1089            cur = self.cx.cursor(f)
1090
1091    def test_cursor_wrong_class(self):
1092        class Foo: pass
1093        foo = Foo()
1094        with self.assertRaises(TypeError):
1095            cur = sqlite.Cursor(foo)
1096
1097    def test_last_row_id_on_replace(self):
1098        """
1099        INSERT OR REPLACE and REPLACE INTO should produce the same behavior.
1100        """
1101        sql = '{} INTO test(id, unique_test) VALUES (?, ?)'
1102        for statement in ('INSERT OR REPLACE', 'REPLACE'):
1103            with self.subTest(statement=statement):
1104                self.cu.execute(sql.format(statement), (1, 'foo'))
1105                self.assertEqual(self.cu.lastrowid, 1)
1106
1107    def test_last_row_id_on_ignore(self):
1108        self.cu.execute(
1109            "insert or ignore into test(unique_test) values (?)",
1110            ('test',))
1111        self.assertEqual(self.cu.lastrowid, 2)
1112        self.cu.execute(
1113            "insert or ignore into test(unique_test) values (?)",
1114            ('test',))
1115        self.assertEqual(self.cu.lastrowid, 2)
1116
1117    def test_last_row_id_insert_o_r(self):
1118        results = []
1119        for statement in ('FAIL', 'ABORT', 'ROLLBACK'):
1120            sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)'
1121            with self.subTest(statement='INSERT OR {}'.format(statement)):
1122                self.cu.execute(sql.format(statement), (statement,))
1123                results.append((statement, self.cu.lastrowid))
1124                with self.assertRaises(sqlite.IntegrityError):
1125                    self.cu.execute(sql.format(statement), (statement,))
1126                results.append((statement, self.cu.lastrowid))
1127        expected = [
1128            ('FAIL', 2), ('FAIL', 2),
1129            ('ABORT', 3), ('ABORT', 3),
1130            ('ROLLBACK', 4), ('ROLLBACK', 4),
1131        ]
1132        self.assertEqual(results, expected)
1133
1134    def test_column_count(self):
1135        # Check that column count is updated correctly for cached statements
1136        select = "select * from test"
1137        res = self.cu.execute(select)
1138        old_count = len(res.description)
1139        # Add a new column and execute the cached select query again
1140        self.cu.execute("alter table test add newcol")
1141        res = self.cu.execute(select)
1142        new_count = len(res.description)
1143        self.assertEqual(new_count - old_count, 1)
1144
1145    def test_same_query_in_multiple_cursors(self):
1146        cursors = [self.cx.execute("select 1") for _ in range(3)]
1147        for cu in cursors:
1148            self.assertEqual(cu.fetchall(), [(1,)])
1149
1150
1151class BlobTests(unittest.TestCase):
1152    def setUp(self):
1153        self.cx = sqlite.connect(":memory:")
1154        self.cx.execute("create table test(b blob)")
1155        self.data = b"this blob data string is exactly fifty bytes long!"
1156        self.cx.execute("insert into test(b) values (?)", (self.data,))
1157        self.blob = self.cx.blobopen("test", "b", 1)
1158
1159    def tearDown(self):
1160        self.blob.close()
1161        self.cx.close()
1162
1163    def test_blob_is_a_blob(self):
1164        self.assertIsInstance(self.blob, sqlite.Blob)
1165
1166    def test_blob_seek_and_tell(self):
1167        self.blob.seek(10)
1168        self.assertEqual(self.blob.tell(), 10)
1169
1170        self.blob.seek(10, SEEK_SET)
1171        self.assertEqual(self.blob.tell(), 10)
1172
1173        self.blob.seek(10, SEEK_CUR)
1174        self.assertEqual(self.blob.tell(), 20)
1175
1176        self.blob.seek(-10, SEEK_END)
1177        self.assertEqual(self.blob.tell(), 40)
1178
1179    def test_blob_seek_error(self):
1180        msg_oor = "offset out of blob range"
1181        msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END"
1182        msg_of = "seek offset results in overflow"
1183
1184        dataset = (
1185            (ValueError, msg_oor, lambda: self.blob.seek(1000)),
1186            (ValueError, msg_oor, lambda: self.blob.seek(-10)),
1187            (ValueError, msg_orig, lambda: self.blob.seek(10, -1)),
1188            (ValueError, msg_orig, lambda: self.blob.seek(10, 3)),
1189        )
1190        for exc, msg, fn in dataset:
1191            with self.subTest(exc=exc, msg=msg, fn=fn):
1192                self.assertRaisesRegex(exc, msg, fn)
1193
1194        # Force overflow errors
1195        self.blob.seek(1, SEEK_SET)
1196        with self.assertRaisesRegex(OverflowError, msg_of):
1197            self.blob.seek(INT_MAX, SEEK_CUR)
1198        with self.assertRaisesRegex(OverflowError, msg_of):
1199            self.blob.seek(INT_MAX, SEEK_END)
1200
1201    def test_blob_read(self):
1202        buf = self.blob.read()
1203        self.assertEqual(buf, self.data)
1204
1205    def test_blob_read_oversized(self):
1206        buf = self.blob.read(len(self.data) * 2)
1207        self.assertEqual(buf, self.data)
1208
1209    def test_blob_read_advance_offset(self):
1210        n = 10
1211        buf = self.blob.read(n)
1212        self.assertEqual(buf, self.data[:n])
1213        self.assertEqual(self.blob.tell(), n)
1214
1215    def test_blob_read_at_offset(self):
1216        self.blob.seek(10)
1217        self.assertEqual(self.blob.read(10), self.data[10:20])
1218
1219    def test_blob_read_error_row_changed(self):
1220        self.cx.execute("update test set b='aaaa' where rowid=1")
1221        with self.assertRaises(sqlite.OperationalError):
1222            self.blob.read()
1223
1224    def test_blob_write(self):
1225        new_data = b"new data".ljust(50)
1226        self.blob.write(new_data)
1227        row = self.cx.execute("select b from test").fetchone()
1228        self.assertEqual(row[0], new_data)
1229
1230    def test_blob_write_at_offset(self):
1231        new_data = b"c" * 25
1232        self.blob.seek(25)
1233        self.blob.write(new_data)
1234        row = self.cx.execute("select b from test").fetchone()
1235        self.assertEqual(row[0], self.data[:25] + new_data)
1236
1237    def test_blob_write_advance_offset(self):
1238        self.blob.write(b"d"*10)
1239        self.assertEqual(self.blob.tell(), 10)
1240
1241    def test_blob_write_error_length(self):
1242        with self.assertRaisesRegex(ValueError, "data longer than blob"):
1243            self.blob.write(b"a" * 1000)
1244
1245        self.blob.seek(0, SEEK_SET)
1246        n = len(self.blob)
1247        self.blob.write(b"a" * (n-1))
1248        self.blob.write(b"a")
1249        with self.assertRaisesRegex(ValueError, "data longer than blob"):
1250            self.blob.write(b"a")
1251
1252    def test_blob_write_error_row_changed(self):
1253        self.cx.execute("update test set b='aaaa' where rowid=1")
1254        with self.assertRaises(sqlite.OperationalError):
1255            self.blob.write(b"aaa")
1256
1257    def test_blob_write_error_readonly(self):
1258        ro_blob = self.cx.blobopen("test", "b", 1, readonly=True)
1259        with self.assertRaisesRegex(sqlite.OperationalError, "readonly"):
1260            ro_blob.write(b"aaa")
1261        ro_blob.close()
1262
1263    def test_blob_open_error(self):
1264        dataset = (
1265            (("test", "b", 1), {"name": "notexisting"}),
1266            (("notexisting", "b", 1), {}),
1267            (("test", "notexisting", 1), {}),
1268            (("test", "b", 2), {}),
1269        )
1270        regex = "no such"
1271        for args, kwds in dataset:
1272            with self.subTest(args=args, kwds=kwds):
1273                with self.assertRaisesRegex(sqlite.OperationalError, regex):
1274                    self.cx.blobopen(*args, **kwds)
1275
1276    def test_blob_length(self):
1277        self.assertEqual(len(self.blob), 50)
1278
1279    def test_blob_get_item(self):
1280        self.assertEqual(self.blob[5], ord("b"))
1281        self.assertEqual(self.blob[6], ord("l"))
1282        self.assertEqual(self.blob[7], ord("o"))
1283        self.assertEqual(self.blob[8], ord("b"))
1284        self.assertEqual(self.blob[-1], ord("!"))
1285
1286    def test_blob_set_item(self):
1287        self.blob[0] = ord("b")
1288        expected = b"b" + self.data[1:]
1289        actual = self.cx.execute("select b from test").fetchone()[0]
1290        self.assertEqual(actual, expected)
1291
1292    def test_blob_set_item_with_offset(self):
1293        self.blob.seek(0, SEEK_END)
1294        self.assertEqual(self.blob.read(), b"")  # verify that we're at EOB
1295        self.blob[0] = ord("T")
1296        self.blob[-1] = ord(".")
1297        self.blob.seek(0, SEEK_SET)
1298        expected = b"This blob data string is exactly fifty bytes long."
1299        self.assertEqual(self.blob.read(), expected)
1300
1301    def test_blob_set_slice_buffer_object(self):
1302        from array import array
1303        self.blob[0:5] = memoryview(b"12345")
1304        self.assertEqual(self.blob[0:5], b"12345")
1305
1306        self.blob[0:5] = bytearray(b"23456")
1307        self.assertEqual(self.blob[0:5], b"23456")
1308
1309        self.blob[0:5] = array("b", [1, 2, 3, 4, 5])
1310        self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05")
1311
1312    def test_blob_set_item_negative_index(self):
1313        self.blob[-1] = 255
1314        self.assertEqual(self.blob[-1], 255)
1315
1316    def test_blob_get_slice(self):
1317        self.assertEqual(self.blob[5:14], b"blob data")
1318
1319    def test_blob_get_empty_slice(self):
1320        self.assertEqual(self.blob[5:5], b"")
1321
1322    def test_blob_get_slice_negative_index(self):
1323        self.assertEqual(self.blob[5:-5], self.data[5:-5])
1324
1325    def test_blob_get_slice_with_skip(self):
1326        self.assertEqual(self.blob[0:10:2], b"ti lb")
1327
1328    def test_blob_set_slice(self):
1329        self.blob[0:5] = b"12345"
1330        expected = b"12345" + self.data[5:]
1331        actual = self.cx.execute("select b from test").fetchone()[0]
1332        self.assertEqual(actual, expected)
1333
1334    def test_blob_set_empty_slice(self):
1335        self.blob[0:0] = b""
1336        self.assertEqual(self.blob[:], self.data)
1337
1338    def test_blob_set_slice_with_skip(self):
1339        self.blob[0:10:2] = b"12345"
1340        actual = self.cx.execute("select b from test").fetchone()[0]
1341        expected = b"1h2s3b4o5 " + self.data[10:]
1342        self.assertEqual(actual, expected)
1343
1344    def test_blob_mapping_invalid_index_type(self):
1345        msg = "indices must be integers"
1346        with self.assertRaisesRegex(TypeError, msg):
1347            self.blob[5:5.5]
1348        with self.assertRaisesRegex(TypeError, msg):
1349            self.blob[1.5]
1350        with self.assertRaisesRegex(TypeError, msg):
1351            self.blob["a"] = b"b"
1352
1353    def test_blob_get_item_error(self):
1354        dataset = [len(self.blob), 105, -105]
1355        for idx in dataset:
1356            with self.subTest(idx=idx):
1357                with self.assertRaisesRegex(IndexError, "index out of range"):
1358                    self.blob[idx]
1359        with self.assertRaisesRegex(IndexError, "cannot fit 'int'"):
1360            self.blob[ULLONG_MAX]
1361
1362        # Provoke read error
1363        self.cx.execute("update test set b='aaaa' where rowid=1")
1364        with self.assertRaises(sqlite.OperationalError):
1365            self.blob[0]
1366
1367    def test_blob_set_item_error(self):
1368        with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1369            self.blob[0] = b"multiple"
1370        with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1371            self.blob[0] = b"1"
1372        with self.assertRaisesRegex(TypeError, "cannot be interpreted"):
1373            self.blob[0] = bytearray(b"1")
1374        with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
1375            del self.blob[0]
1376        with self.assertRaisesRegex(IndexError, "Blob index out of range"):
1377            self.blob[1000] = 0
1378        with self.assertRaisesRegex(ValueError, "must be in range"):
1379            self.blob[0] = -1
1380        with self.assertRaisesRegex(ValueError, "must be in range"):
1381            self.blob[0] = 256
1382        # Overflow errors are overridden with ValueError
1383        with self.assertRaisesRegex(ValueError, "must be in range"):
1384            self.blob[0] = 2**65
1385
1386    def test_blob_set_slice_error(self):
1387        with self.assertRaisesRegex(IndexError, "wrong size"):
1388            self.blob[5:10] = b"a"
1389        with self.assertRaisesRegex(IndexError, "wrong size"):
1390            self.blob[5:10] = b"a" * 1000
1391        with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
1392            del self.blob[5:10]
1393        with self.assertRaisesRegex(ValueError, "step cannot be zero"):
1394            self.blob[5:10:0] = b"12345"
1395        with self.assertRaises(BufferError):
1396            self.blob[5:10] = memoryview(b"abcde")[::2]
1397
1398    def test_blob_sequence_not_supported(self):
1399        with self.assertRaisesRegex(TypeError, "unsupported operand"):
1400            self.blob + self.blob
1401        with self.assertRaisesRegex(TypeError, "unsupported operand"):
1402            self.blob * 5
1403        with self.assertRaisesRegex(TypeError, "is not iterable"):
1404            b"a" in self.blob
1405
1406    def test_blob_context_manager(self):
1407        data = b"a" * 50
1408        with self.cx.blobopen("test", "b", 1) as blob:
1409            blob.write(data)
1410        actual = self.cx.execute("select b from test").fetchone()[0]
1411        self.assertEqual(actual, data)
1412
1413        # Check that __exit__ closed the blob
1414        with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"):
1415            blob.read()
1416
1417    def test_blob_context_manager_reraise_exceptions(self):
1418        class DummyException(Exception):
1419            pass
1420        with self.assertRaisesRegex(DummyException, "reraised"):
1421            with self.cx.blobopen("test", "b", 1) as blob:
1422                raise DummyException("reraised")
1423
1424
1425    def test_blob_closed(self):
1426        with memory_database() as cx:
1427            cx.execute("create table test(b blob)")
1428            cx.execute("insert into test values (zeroblob(100))")
1429            blob = cx.blobopen("test", "b", 1)
1430            blob.close()
1431
1432            msg = "Cannot operate on a closed blob"
1433            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1434                blob.read()
1435            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1436                blob.write(b"")
1437            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1438                blob.seek(0)
1439            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1440                blob.tell()
1441            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1442                blob.__enter__()
1443            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1444                blob.__exit__(None, None, None)
1445            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1446                len(blob)
1447            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1448                blob[0]
1449            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1450                blob[0:1]
1451            with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
1452                blob[0] = b""
1453
1454    def test_blob_closed_db_read(self):
1455        with memory_database() as cx:
1456            cx.execute("create table test(b blob)")
1457            cx.execute("insert into test(b) values (zeroblob(100))")
1458            blob = cx.blobopen("test", "b", 1)
1459            cx.close()
1460            self.assertRaisesRegex(sqlite.ProgrammingError,
1461                                   "Cannot operate on a closed database",
1462                                   blob.read)
1463
1464    def test_blob_32bit_rowid(self):
1465        # gh-100370: we should not get an OverflowError for 32-bit rowids
1466        with memory_database() as cx:
1467            rowid = 2**32
1468            cx.execute("create table t(t blob)")
1469            cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,))
1470            cx.blobopen('t', 't', rowid)
1471
1472
1473@threading_helper.requires_working_threading()
1474class ThreadTests(unittest.TestCase):
1475    def setUp(self):
1476        self.con = sqlite.connect(":memory:")
1477        self.cur = self.con.cursor()
1478        self.cur.execute("create table test(name text, b blob)")
1479        self.cur.execute("insert into test values('blob', zeroblob(1))")
1480
1481    def tearDown(self):
1482        self.cur.close()
1483        self.con.close()
1484
1485    @threading_helper.reap_threads
1486    def _run_test(self, fn, *args, **kwds):
1487        def run(err):
1488            try:
1489                fn(*args, **kwds)
1490                err.append("did not raise ProgrammingError")
1491            except sqlite.ProgrammingError:
1492                pass
1493            except:
1494                err.append("raised wrong exception")
1495
1496        err = []
1497        t = threading.Thread(target=run, kwargs={"err": err})
1498        t.start()
1499        t.join()
1500        if err:
1501            self.fail("\n".join(err))
1502
1503    def test_check_connection_thread(self):
1504        fns = [
1505            lambda: self.con.cursor(),
1506            lambda: self.con.commit(),
1507            lambda: self.con.rollback(),
1508            lambda: self.con.close(),
1509            lambda: self.con.set_trace_callback(None),
1510            lambda: self.con.set_authorizer(None),
1511            lambda: self.con.create_collation("foo", None),
1512            lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
1513            lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
1514            lambda: self.con.blobopen("test", "b", 1),
1515        ]
1516        if hasattr(sqlite.Connection, "serialize"):
1517            fns.append(lambda: self.con.serialize())
1518            fns.append(lambda: self.con.deserialize(b""))
1519        if sqlite.sqlite_version_info >= (3, 25, 0):
1520            fns.append(lambda: self.con.create_window_function("foo", 0, None))
1521
1522        for fn in fns:
1523            with self.subTest(fn=fn):
1524                self._run_test(fn)
1525
1526    def test_check_cursor_thread(self):
1527        fns = [
1528            lambda: self.cur.execute("insert into test(name) values('a')"),
1529            lambda: self.cur.close(),
1530            lambda: self.cur.execute("select name from test"),
1531            lambda: self.cur.fetchone(),
1532        ]
1533        for fn in fns:
1534            with self.subTest(fn=fn):
1535                self._run_test(fn)
1536
1537
1538    @threading_helper.reap_threads
1539    def test_dont_check_same_thread(self):
1540        def run(con, err):
1541            try:
1542                con.execute("select 1")
1543            except sqlite.Error:
1544                err.append("multi-threading not allowed")
1545
1546        con = sqlite.connect(":memory:", check_same_thread=False)
1547        err = []
1548        t = threading.Thread(target=run, kwargs={"con": con, "err": err})
1549        t.start()
1550        t.join()
1551        self.assertEqual(len(err), 0, "\n".join(err))
1552
1553
1554class ConstructorTests(unittest.TestCase):
1555    def test_date(self):
1556        d = sqlite.Date(2004, 10, 28)
1557
1558    def test_time(self):
1559        t = sqlite.Time(12, 39, 35)
1560
1561    def test_timestamp(self):
1562        ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
1563
1564    def test_date_from_ticks(self):
1565        d = sqlite.DateFromTicks(42)
1566
1567    def test_time_from_ticks(self):
1568        t = sqlite.TimeFromTicks(42)
1569
1570    def test_timestamp_from_ticks(self):
1571        ts = sqlite.TimestampFromTicks(42)
1572
1573    def test_binary(self):
1574        b = sqlite.Binary(b"\0'")
1575
1576class ExtensionTests(unittest.TestCase):
1577    def test_script_string_sql(self):
1578        con = sqlite.connect(":memory:")
1579        cur = con.cursor()
1580        cur.executescript("""
1581            -- bla bla
1582            /* a stupid comment */
1583            create table a(i);
1584            insert into a(i) values (5);
1585            """)
1586        cur.execute("select i from a")
1587        res = cur.fetchone()[0]
1588        self.assertEqual(res, 5)
1589
1590    def test_script_syntax_error(self):
1591        con = sqlite.connect(":memory:")
1592        cur = con.cursor()
1593        with self.assertRaises(sqlite.OperationalError):
1594            cur.executescript("create table test(x); asdf; create table test2(x)")
1595
1596    def test_script_error_normal(self):
1597        con = sqlite.connect(":memory:")
1598        cur = con.cursor()
1599        with self.assertRaises(sqlite.OperationalError):
1600            cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
1601
1602    def test_cursor_executescript_as_bytes(self):
1603        con = sqlite.connect(":memory:")
1604        cur = con.cursor()
1605        with self.assertRaises(TypeError):
1606            cur.executescript(b"create table test(foo); insert into test(foo) values (5);")
1607
1608    def test_cursor_executescript_with_null_characters(self):
1609        con = sqlite.connect(":memory:")
1610        cur = con.cursor()
1611        with self.assertRaises(ValueError):
1612            cur.executescript("""
1613                create table a(i);\0
1614                insert into a(i) values (5);
1615                """)
1616
1617    def test_cursor_executescript_with_surrogates(self):
1618        con = sqlite.connect(":memory:")
1619        cur = con.cursor()
1620        with self.assertRaises(UnicodeEncodeError):
1621            cur.executescript("""
1622                create table a(s);
1623                insert into a(s) values ('\ud8ff');
1624                """)
1625
1626    def test_cursor_executescript_too_large_script(self):
1627        msg = "query string is too large"
1628        with memory_database() as cx, cx_limit(cx) as lim:
1629            cx.executescript("select 'almost too large'".ljust(lim))
1630            with self.assertRaisesRegex(sqlite.DataError, msg):
1631                cx.executescript("select 'too large'".ljust(lim+1))
1632
1633    def test_cursor_executescript_tx_control(self):
1634        con = sqlite.connect(":memory:")
1635        con.execute("begin")
1636        self.assertTrue(con.in_transaction)
1637        con.executescript("select 1")
1638        self.assertFalse(con.in_transaction)
1639
1640    def test_connection_execute(self):
1641        con = sqlite.connect(":memory:")
1642        result = con.execute("select 5").fetchone()[0]
1643        self.assertEqual(result, 5, "Basic test of Connection.execute")
1644
1645    def test_connection_executemany(self):
1646        con = sqlite.connect(":memory:")
1647        con.execute("create table test(foo)")
1648        con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
1649        result = con.execute("select foo from test order by foo").fetchall()
1650        self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
1651        self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
1652
1653    def test_connection_executescript(self):
1654        con = sqlite.connect(":memory:")
1655        con.executescript("create table test(foo); insert into test(foo) values (5);")
1656        result = con.execute("select foo from test").fetchone()[0]
1657        self.assertEqual(result, 5, "Basic test of Connection.executescript")
1658
1659class ClosedConTests(unittest.TestCase):
1660    def test_closed_con_cursor(self):
1661        con = sqlite.connect(":memory:")
1662        con.close()
1663        with self.assertRaises(sqlite.ProgrammingError):
1664            cur = con.cursor()
1665
1666    def test_closed_con_commit(self):
1667        con = sqlite.connect(":memory:")
1668        con.close()
1669        with self.assertRaises(sqlite.ProgrammingError):
1670            con.commit()
1671
1672    def test_closed_con_rollback(self):
1673        con = sqlite.connect(":memory:")
1674        con.close()
1675        with self.assertRaises(sqlite.ProgrammingError):
1676            con.rollback()
1677
1678    def test_closed_cur_execute(self):
1679        con = sqlite.connect(":memory:")
1680        cur = con.cursor()
1681        con.close()
1682        with self.assertRaises(sqlite.ProgrammingError):
1683            cur.execute("select 4")
1684
1685    def test_closed_create_function(self):
1686        con = sqlite.connect(":memory:")
1687        con.close()
1688        def f(x): return 17
1689        with self.assertRaises(sqlite.ProgrammingError):
1690            con.create_function("foo", 1, f)
1691
1692    def test_closed_create_aggregate(self):
1693        con = sqlite.connect(":memory:")
1694        con.close()
1695        class Agg:
1696            def __init__(self):
1697                pass
1698            def step(self, x):
1699                pass
1700            def finalize(self):
1701                return 17
1702        with self.assertRaises(sqlite.ProgrammingError):
1703            con.create_aggregate("foo", 1, Agg)
1704
1705    def test_closed_set_authorizer(self):
1706        con = sqlite.connect(":memory:")
1707        con.close()
1708        def authorizer(*args):
1709            return sqlite.DENY
1710        with self.assertRaises(sqlite.ProgrammingError):
1711            con.set_authorizer(authorizer)
1712
1713    def test_closed_set_progress_callback(self):
1714        con = sqlite.connect(":memory:")
1715        con.close()
1716        def progress(): pass
1717        with self.assertRaises(sqlite.ProgrammingError):
1718            con.set_progress_handler(progress, 100)
1719
1720    def test_closed_call(self):
1721        con = sqlite.connect(":memory:")
1722        con.close()
1723        with self.assertRaises(sqlite.ProgrammingError):
1724            con()
1725
1726class ClosedCurTests(unittest.TestCase):
1727    def test_closed(self):
1728        con = sqlite.connect(":memory:")
1729        cur = con.cursor()
1730        cur.close()
1731
1732        for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
1733            if method_name in ("execute", "executescript"):
1734                params = ("select 4 union select 5",)
1735            elif method_name == "executemany":
1736                params = ("insert into foo(bar) values (?)", [(3,), (4,)])
1737            else:
1738                params = []
1739
1740            with self.assertRaises(sqlite.ProgrammingError):
1741                method = getattr(cur, method_name)
1742                method(*params)
1743
1744
1745class SqliteOnConflictTests(unittest.TestCase):
1746    """
1747    Tests for SQLite's "insert on conflict" feature.
1748
1749    See https://www.sqlite.org/lang_conflict.html for details.
1750    """
1751
1752    def setUp(self):
1753        self.cx = sqlite.connect(":memory:")
1754        self.cu = self.cx.cursor()
1755        self.cu.execute("""
1756          CREATE TABLE test(
1757            id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE
1758          );
1759        """)
1760
1761    def tearDown(self):
1762        self.cu.close()
1763        self.cx.close()
1764
1765    def test_on_conflict_rollback_with_explicit_transaction(self):
1766        self.cx.isolation_level = None  # autocommit mode
1767        self.cu = self.cx.cursor()
1768        # Start an explicit transaction.
1769        self.cu.execute("BEGIN")
1770        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1771        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1772        with self.assertRaises(sqlite.IntegrityError):
1773            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1774        # Use connection to commit.
1775        self.cx.commit()
1776        self.cu.execute("SELECT name, unique_name from test")
1777        # Transaction should have rolled back and nothing should be in table.
1778        self.assertEqual(self.cu.fetchall(), [])
1779
1780    def test_on_conflict_abort_raises_with_explicit_transactions(self):
1781        # Abort cancels the current sql statement but doesn't change anything
1782        # about the current transaction.
1783        self.cx.isolation_level = None  # autocommit mode
1784        self.cu = self.cx.cursor()
1785        # Start an explicit transaction.
1786        self.cu.execute("BEGIN")
1787        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1788        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1789        with self.assertRaises(sqlite.IntegrityError):
1790            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1791        self.cx.commit()
1792        self.cu.execute("SELECT name, unique_name FROM test")
1793        # Expect the first two inserts to work, third to do nothing.
1794        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
1795
1796    def test_on_conflict_rollback_without_transaction(self):
1797        # Start of implicit transaction
1798        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1799        self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1800        with self.assertRaises(sqlite.IntegrityError):
1801            self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')")
1802        self.cu.execute("SELECT name, unique_name FROM test")
1803        # Implicit transaction is rolled back on error.
1804        self.assertEqual(self.cu.fetchall(), [])
1805
1806    def test_on_conflict_abort_raises_without_transactions(self):
1807        # Abort cancels the current sql statement but doesn't change anything
1808        # about the current transaction.
1809        self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')")
1810        self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1811        with self.assertRaises(sqlite.IntegrityError):
1812            self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')")
1813        # Make sure all other values were inserted.
1814        self.cu.execute("SELECT name, unique_name FROM test")
1815        self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)])
1816
1817    def test_on_conflict_fail(self):
1818        self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
1819        with self.assertRaises(sqlite.IntegrityError):
1820            self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')")
1821        self.assertEqual(self.cu.fetchall(), [])
1822
1823    def test_on_conflict_ignore(self):
1824        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
1825        # Nothing should happen.
1826        self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')")
1827        self.cu.execute("SELECT unique_name FROM test")
1828        self.assertEqual(self.cu.fetchall(), [('foo',)])
1829
1830    def test_on_conflict_replace(self):
1831        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')")
1832        # There shouldn't be an IntegrityError exception.
1833        self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')")
1834        self.cu.execute("SELECT name, unique_name FROM test")
1835        self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])
1836
1837
1838@requires_subprocess()
1839class MultiprocessTests(unittest.TestCase):
1840    CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000.  # Defaults to 30 ms
1841
1842    def tearDown(self):
1843        unlink(TESTFN)
1844
1845    def test_ctx_mgr_rollback_if_commit_failed(self):
1846        # bpo-27334: ctx manager does not rollback if commit fails
1847        SCRIPT = f"""if 1:
1848            import sqlite3
1849            def wait():
1850                print("started")
1851                assert "database is locked" in input()
1852
1853            cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT})
1854            cx.create_function("wait", 0, wait)
1855            with cx:
1856                cx.execute("create table t(t)")
1857            try:
1858                # execute two transactions; both will try to lock the db
1859                cx.executescript('''
1860                    -- start a transaction and wait for parent
1861                    begin transaction;
1862                    select * from t;
1863                    select wait();
1864                    rollback;
1865
1866                    -- start a new transaction; would fail if parent holds lock
1867                    begin transaction;
1868                    select * from t;
1869                    rollback;
1870                ''')
1871            finally:
1872                cx.close()
1873        """
1874
1875        # spawn child process
1876        proc = subprocess.Popen(
1877            [sys.executable, "-c", SCRIPT],
1878            encoding="utf-8",
1879            bufsize=0,
1880            stdin=subprocess.PIPE,
1881            stdout=subprocess.PIPE,
1882        )
1883        self.addCleanup(proc.communicate)
1884
1885        # wait for child process to start
1886        self.assertEqual("started", proc.stdout.readline().strip())
1887
1888        cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT)
1889        try:  # context manager should correctly release the db lock
1890            with cx:
1891                cx.execute("insert into t values('test')")
1892        except sqlite.OperationalError as exc:
1893            proc.stdin.write(str(exc))
1894        else:
1895            proc.stdin.write("no error")
1896        finally:
1897            cx.close()
1898
1899        # terminate child process
1900        self.assertIsNone(proc.returncode)
1901        try:
1902            proc.communicate(input="end", timeout=SHORT_TIMEOUT)
1903        except subprocess.TimeoutExpired:
1904            proc.kill()
1905            proc.communicate()
1906            raise
1907        self.assertEqual(proc.returncode, 0)
1908
1909
1910if __name__ == "__main__":
1911    unittest.main()
1912