1# pysqlite2/test/dbapi.py: tests for DB-API compliance 2# 3# Copyright (C) 2004-2010 Gerhard Häring <[email protected]> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import contextlib 24import os 25import sqlite3 as sqlite 26import subprocess 27import sys 28import threading 29import unittest 30import urllib.parse 31 32from test.support import ( 33 SHORT_TIMEOUT, bigmemtest, check_disallow_instantiation, requires_subprocess, 34 is_emscripten, is_wasi 35) 36from test.support import threading_helper 37from _testcapi import INT_MAX, ULLONG_MAX 38from os import SEEK_SET, SEEK_CUR, SEEK_END 39from test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath 40 41 42# Helper for temporary memory databases 43def memory_database(*args, **kwargs): 44 cx = sqlite.connect(":memory:", *args, **kwargs) 45 return contextlib.closing(cx) 46 47 48# Temporarily limit a database connection parameter 49@contextlib.contextmanager 50def cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128): 51 try: 52 _prev = cx.setlimit(category, limit) 53 yield limit 54 finally: 55 cx.setlimit(category, _prev) 56 57 58class ModuleTests(unittest.TestCase): 59 def test_api_level(self): 60 self.assertEqual(sqlite.apilevel, "2.0", 61 "apilevel is %s, should be 2.0" % sqlite.apilevel) 62 63 def test_thread_safety(self): 64 self.assertIn(sqlite.threadsafety, {0, 1, 3}, 65 "threadsafety is %d, should be 0, 1 or 3" % 66 sqlite.threadsafety) 67 68 def test_param_style(self): 69 self.assertEqual(sqlite.paramstyle, "qmark", 70 "paramstyle is '%s', should be 'qmark'" % 71 sqlite.paramstyle) 72 73 def test_warning(self): 74 self.assertTrue(issubclass(sqlite.Warning, Exception), 75 "Warning is not a subclass of Exception") 76 77 def test_error(self): 78 self.assertTrue(issubclass(sqlite.Error, Exception), 79 "Error is not a subclass of Exception") 80 81 def test_interface_error(self): 82 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 83 "InterfaceError is not a subclass of Error") 84 85 def test_database_error(self): 86 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 87 "DatabaseError is not a subclass of Error") 88 89 def test_data_error(self): 90 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 91 "DataError is not a subclass of DatabaseError") 92 93 def test_operational_error(self): 94 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 95 "OperationalError is not a subclass of DatabaseError") 96 97 def test_integrity_error(self): 98 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 99 "IntegrityError is not a subclass of DatabaseError") 100 101 def test_internal_error(self): 102 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 103 "InternalError is not a subclass of DatabaseError") 104 105 def test_programming_error(self): 106 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 107 "ProgrammingError is not a subclass of DatabaseError") 108 109 def test_not_supported_error(self): 110 self.assertTrue(issubclass(sqlite.NotSupportedError, 111 sqlite.DatabaseError), 112 "NotSupportedError is not a subclass of DatabaseError") 113 114 def test_module_constants(self): 115 consts = [ 116 "SQLITE_ABORT", 117 "SQLITE_ALTER_TABLE", 118 "SQLITE_ANALYZE", 119 "SQLITE_ATTACH", 120 "SQLITE_AUTH", 121 "SQLITE_BUSY", 122 "SQLITE_CANTOPEN", 123 "SQLITE_CONSTRAINT", 124 "SQLITE_CORRUPT", 125 "SQLITE_CREATE_INDEX", 126 "SQLITE_CREATE_TABLE", 127 "SQLITE_CREATE_TEMP_INDEX", 128 "SQLITE_CREATE_TEMP_TABLE", 129 "SQLITE_CREATE_TEMP_TRIGGER", 130 "SQLITE_CREATE_TEMP_VIEW", 131 "SQLITE_CREATE_TRIGGER", 132 "SQLITE_CREATE_VIEW", 133 "SQLITE_CREATE_VTABLE", 134 "SQLITE_DELETE", 135 "SQLITE_DENY", 136 "SQLITE_DETACH", 137 "SQLITE_DONE", 138 "SQLITE_DROP_INDEX", 139 "SQLITE_DROP_TABLE", 140 "SQLITE_DROP_TEMP_INDEX", 141 "SQLITE_DROP_TEMP_TABLE", 142 "SQLITE_DROP_TEMP_TRIGGER", 143 "SQLITE_DROP_TEMP_VIEW", 144 "SQLITE_DROP_TRIGGER", 145 "SQLITE_DROP_VIEW", 146 "SQLITE_DROP_VTABLE", 147 "SQLITE_EMPTY", 148 "SQLITE_ERROR", 149 "SQLITE_FORMAT", 150 "SQLITE_FULL", 151 "SQLITE_FUNCTION", 152 "SQLITE_IGNORE", 153 "SQLITE_INSERT", 154 "SQLITE_INTERNAL", 155 "SQLITE_INTERRUPT", 156 "SQLITE_IOERR", 157 "SQLITE_LOCKED", 158 "SQLITE_MISMATCH", 159 "SQLITE_MISUSE", 160 "SQLITE_NOLFS", 161 "SQLITE_NOMEM", 162 "SQLITE_NOTADB", 163 "SQLITE_NOTFOUND", 164 "SQLITE_OK", 165 "SQLITE_PERM", 166 "SQLITE_PRAGMA", 167 "SQLITE_PROTOCOL", 168 "SQLITE_RANGE", 169 "SQLITE_READ", 170 "SQLITE_READONLY", 171 "SQLITE_REINDEX", 172 "SQLITE_ROW", 173 "SQLITE_SAVEPOINT", 174 "SQLITE_SCHEMA", 175 "SQLITE_SELECT", 176 "SQLITE_TOOBIG", 177 "SQLITE_TRANSACTION", 178 "SQLITE_UPDATE", 179 # Run-time limit categories 180 "SQLITE_LIMIT_LENGTH", 181 "SQLITE_LIMIT_SQL_LENGTH", 182 "SQLITE_LIMIT_COLUMN", 183 "SQLITE_LIMIT_EXPR_DEPTH", 184 "SQLITE_LIMIT_COMPOUND_SELECT", 185 "SQLITE_LIMIT_VDBE_OP", 186 "SQLITE_LIMIT_FUNCTION_ARG", 187 "SQLITE_LIMIT_ATTACHED", 188 "SQLITE_LIMIT_LIKE_PATTERN_LENGTH", 189 "SQLITE_LIMIT_VARIABLE_NUMBER", 190 "SQLITE_LIMIT_TRIGGER_DEPTH", 191 ] 192 if sqlite.sqlite_version_info >= (3, 7, 17): 193 consts += ["SQLITE_NOTICE", "SQLITE_WARNING"] 194 if sqlite.sqlite_version_info >= (3, 8, 3): 195 consts.append("SQLITE_RECURSIVE") 196 if sqlite.sqlite_version_info >= (3, 8, 7): 197 consts.append("SQLITE_LIMIT_WORKER_THREADS") 198 consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"] 199 # Extended result codes 200 consts += [ 201 "SQLITE_ABORT_ROLLBACK", 202 "SQLITE_BUSY_RECOVERY", 203 "SQLITE_CANTOPEN_FULLPATH", 204 "SQLITE_CANTOPEN_ISDIR", 205 "SQLITE_CANTOPEN_NOTEMPDIR", 206 "SQLITE_CORRUPT_VTAB", 207 "SQLITE_IOERR_ACCESS", 208 "SQLITE_IOERR_BLOCKED", 209 "SQLITE_IOERR_CHECKRESERVEDLOCK", 210 "SQLITE_IOERR_CLOSE", 211 "SQLITE_IOERR_DELETE", 212 "SQLITE_IOERR_DELETE_NOENT", 213 "SQLITE_IOERR_DIR_CLOSE", 214 "SQLITE_IOERR_DIR_FSYNC", 215 "SQLITE_IOERR_FSTAT", 216 "SQLITE_IOERR_FSYNC", 217 "SQLITE_IOERR_LOCK", 218 "SQLITE_IOERR_NOMEM", 219 "SQLITE_IOERR_RDLOCK", 220 "SQLITE_IOERR_READ", 221 "SQLITE_IOERR_SEEK", 222 "SQLITE_IOERR_SHMLOCK", 223 "SQLITE_IOERR_SHMMAP", 224 "SQLITE_IOERR_SHMOPEN", 225 "SQLITE_IOERR_SHMSIZE", 226 "SQLITE_IOERR_SHORT_READ", 227 "SQLITE_IOERR_TRUNCATE", 228 "SQLITE_IOERR_UNLOCK", 229 "SQLITE_IOERR_WRITE", 230 "SQLITE_LOCKED_SHAREDCACHE", 231 "SQLITE_READONLY_CANTLOCK", 232 "SQLITE_READONLY_RECOVERY", 233 ] 234 if sqlite.sqlite_version_info >= (3, 7, 16): 235 consts += [ 236 "SQLITE_CONSTRAINT_CHECK", 237 "SQLITE_CONSTRAINT_COMMITHOOK", 238 "SQLITE_CONSTRAINT_FOREIGNKEY", 239 "SQLITE_CONSTRAINT_FUNCTION", 240 "SQLITE_CONSTRAINT_NOTNULL", 241 "SQLITE_CONSTRAINT_PRIMARYKEY", 242 "SQLITE_CONSTRAINT_TRIGGER", 243 "SQLITE_CONSTRAINT_UNIQUE", 244 "SQLITE_CONSTRAINT_VTAB", 245 "SQLITE_READONLY_ROLLBACK", 246 ] 247 if sqlite.sqlite_version_info >= (3, 7, 17): 248 consts += [ 249 "SQLITE_IOERR_MMAP", 250 "SQLITE_NOTICE_RECOVER_ROLLBACK", 251 "SQLITE_NOTICE_RECOVER_WAL", 252 ] 253 if sqlite.sqlite_version_info >= (3, 8, 0): 254 consts += [ 255 "SQLITE_BUSY_SNAPSHOT", 256 "SQLITE_IOERR_GETTEMPPATH", 257 "SQLITE_WARNING_AUTOINDEX", 258 ] 259 if sqlite.sqlite_version_info >= (3, 8, 1): 260 consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"] 261 if sqlite.sqlite_version_info >= (3, 8, 2): 262 consts.append("SQLITE_CONSTRAINT_ROWID") 263 if sqlite.sqlite_version_info >= (3, 8, 3): 264 consts.append("SQLITE_READONLY_DBMOVED") 265 if sqlite.sqlite_version_info >= (3, 8, 7): 266 consts.append("SQLITE_AUTH_USER") 267 if sqlite.sqlite_version_info >= (3, 9, 0): 268 consts.append("SQLITE_IOERR_VNODE") 269 if sqlite.sqlite_version_info >= (3, 10, 0): 270 consts.append("SQLITE_IOERR_AUTH") 271 if sqlite.sqlite_version_info >= (3, 14, 1): 272 consts.append("SQLITE_OK_LOAD_PERMANENTLY") 273 if sqlite.sqlite_version_info >= (3, 21, 0): 274 consts += [ 275 "SQLITE_IOERR_BEGIN_ATOMIC", 276 "SQLITE_IOERR_COMMIT_ATOMIC", 277 "SQLITE_IOERR_ROLLBACK_ATOMIC", 278 ] 279 if sqlite.sqlite_version_info >= (3, 22, 0): 280 consts += [ 281 "SQLITE_ERROR_MISSING_COLLSEQ", 282 "SQLITE_ERROR_RETRY", 283 "SQLITE_READONLY_CANTINIT", 284 "SQLITE_READONLY_DIRECTORY", 285 ] 286 if sqlite.sqlite_version_info >= (3, 24, 0): 287 consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"] 288 if sqlite.sqlite_version_info >= (3, 25, 0): 289 consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"] 290 if sqlite.sqlite_version_info >= (3, 31, 0): 291 consts += [ 292 "SQLITE_CANTOPEN_SYMLINK", 293 "SQLITE_CONSTRAINT_PINNED", 294 "SQLITE_OK_SYMLINK", 295 ] 296 if sqlite.sqlite_version_info >= (3, 32, 0): 297 consts += [ 298 "SQLITE_BUSY_TIMEOUT", 299 "SQLITE_CORRUPT_INDEX", 300 "SQLITE_IOERR_DATA", 301 ] 302 if sqlite.sqlite_version_info >= (3, 34, 0): 303 consts.append("SQLITE_IOERR_CORRUPTFS") 304 for const in consts: 305 with self.subTest(const=const): 306 self.assertTrue(hasattr(sqlite, const)) 307 308 def test_error_code_on_exception(self): 309 err_msg = "unable to open database file" 310 if sys.platform.startswith("win"): 311 err_code = sqlite.SQLITE_CANTOPEN_ISDIR 312 else: 313 err_code = sqlite.SQLITE_CANTOPEN 314 315 with temp_dir() as db: 316 with self.assertRaisesRegex(sqlite.Error, err_msg) as cm: 317 sqlite.connect(db) 318 e = cm.exception 319 self.assertEqual(e.sqlite_errorcode, err_code) 320 self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN")) 321 322 @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16), 323 "Requires SQLite 3.7.16 or newer") 324 def test_extended_error_code_on_exception(self): 325 with memory_database() as con: 326 with con: 327 con.execute("create table t(t integer check(t > 0))") 328 errmsg = "constraint failed" 329 with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm: 330 con.execute("insert into t values(-1)") 331 exc = cm.exception 332 self.assertEqual(exc.sqlite_errorcode, 333 sqlite.SQLITE_CONSTRAINT_CHECK) 334 self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK") 335 336 # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise 337 # OperationalError on some buildbots. 338 @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS") 339 def test_shared_cache_deprecated(self): 340 for enable in (True, False): 341 with self.assertWarns(DeprecationWarning) as cm: 342 sqlite.enable_shared_cache(enable) 343 self.assertIn("dbapi.py", cm.filename) 344 345 def test_disallow_instantiation(self): 346 cx = sqlite.connect(":memory:") 347 check_disallow_instantiation(self, type(cx("select 1"))) 348 check_disallow_instantiation(self, sqlite.Blob) 349 350 def test_complete_statement(self): 351 self.assertFalse(sqlite.complete_statement("select t")) 352 self.assertTrue(sqlite.complete_statement("create table t(t);")) 353 354 355class ConnectionTests(unittest.TestCase): 356 357 def setUp(self): 358 self.cx = sqlite.connect(":memory:") 359 cu = self.cx.cursor() 360 cu.execute("create table test(id integer primary key, name text)") 361 cu.execute("insert into test(name) values (?)", ("foo",)) 362 363 def tearDown(self): 364 self.cx.close() 365 366 def test_commit(self): 367 self.cx.commit() 368 369 def test_commit_after_no_changes(self): 370 """ 371 A commit should also work when no changes were made to the database. 372 """ 373 self.cx.commit() 374 self.cx.commit() 375 376 def test_rollback(self): 377 self.cx.rollback() 378 379 def test_rollback_after_no_changes(self): 380 """ 381 A rollback should also work when no changes were made to the database. 382 """ 383 self.cx.rollback() 384 self.cx.rollback() 385 386 def test_cursor(self): 387 cu = self.cx.cursor() 388 389 def test_failed_open(self): 390 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 391 with self.assertRaises(sqlite.OperationalError): 392 sqlite.connect(YOU_CANNOT_OPEN_THIS) 393 394 def test_close(self): 395 self.cx.close() 396 397 def test_use_after_close(self): 398 sql = "select 1" 399 cu = self.cx.cursor() 400 res = cu.execute(sql) 401 self.cx.close() 402 self.assertRaises(sqlite.ProgrammingError, res.fetchall) 403 self.assertRaises(sqlite.ProgrammingError, cu.execute, sql) 404 self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, []) 405 self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql) 406 self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql) 407 self.assertRaises(sqlite.ProgrammingError, 408 self.cx.executemany, sql, []) 409 self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql) 410 self.assertRaises(sqlite.ProgrammingError, 411 self.cx.create_function, "t", 1, lambda x: x) 412 self.assertRaises(sqlite.ProgrammingError, self.cx.cursor) 413 with self.assertRaises(sqlite.ProgrammingError): 414 with self.cx: 415 pass 416 417 def test_exceptions(self): 418 # Optional DB-API extension. 419 self.assertEqual(self.cx.Warning, sqlite.Warning) 420 self.assertEqual(self.cx.Error, sqlite.Error) 421 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 422 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 423 self.assertEqual(self.cx.DataError, sqlite.DataError) 424 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 425 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 426 self.assertEqual(self.cx.InternalError, sqlite.InternalError) 427 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 428 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 429 430 def test_in_transaction(self): 431 # Can't use db from setUp because we want to test initial state. 432 cx = sqlite.connect(":memory:") 433 cu = cx.cursor() 434 self.assertEqual(cx.in_transaction, False) 435 cu.execute("create table transactiontest(id integer primary key, name text)") 436 self.assertEqual(cx.in_transaction, False) 437 cu.execute("insert into transactiontest(name) values (?)", ("foo",)) 438 self.assertEqual(cx.in_transaction, True) 439 cu.execute("select name from transactiontest where name=?", ["foo"]) 440 row = cu.fetchone() 441 self.assertEqual(cx.in_transaction, True) 442 cx.commit() 443 self.assertEqual(cx.in_transaction, False) 444 cu.execute("select name from transactiontest where name=?", ["foo"]) 445 row = cu.fetchone() 446 self.assertEqual(cx.in_transaction, False) 447 448 def test_in_transaction_ro(self): 449 with self.assertRaises(AttributeError): 450 self.cx.in_transaction = True 451 452 def test_connection_exceptions(self): 453 exceptions = [ 454 "DataError", 455 "DatabaseError", 456 "Error", 457 "IntegrityError", 458 "InterfaceError", 459 "NotSupportedError", 460 "OperationalError", 461 "ProgrammingError", 462 "Warning", 463 ] 464 for exc in exceptions: 465 with self.subTest(exc=exc): 466 self.assertTrue(hasattr(self.cx, exc)) 467 self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc)) 468 469 def test_interrupt_on_closed_db(self): 470 cx = sqlite.connect(":memory:") 471 cx.close() 472 with self.assertRaises(sqlite.ProgrammingError): 473 cx.interrupt() 474 475 def test_interrupt(self): 476 self.assertIsNone(self.cx.interrupt()) 477 478 def test_drop_unused_refs(self): 479 for n in range(500): 480 cu = self.cx.execute(f"select {n}") 481 self.assertEqual(cu.fetchone()[0], n) 482 483 def test_connection_limits(self): 484 category = sqlite.SQLITE_LIMIT_SQL_LENGTH 485 saved_limit = self.cx.getlimit(category) 486 try: 487 new_limit = 10 488 prev_limit = self.cx.setlimit(category, new_limit) 489 self.assertEqual(saved_limit, prev_limit) 490 self.assertEqual(self.cx.getlimit(category), new_limit) 491 msg = "query string is too large" 492 self.assertRaisesRegex(sqlite.DataError, msg, 493 self.cx.execute, "select 1 as '16'") 494 finally: # restore saved limit 495 self.cx.setlimit(category, saved_limit) 496 497 def test_connection_bad_limit_category(self): 498 msg = "'category' is out of bounds" 499 cat = 1111 500 self.assertRaisesRegex(sqlite.ProgrammingError, msg, 501 self.cx.getlimit, cat) 502 self.assertRaisesRegex(sqlite.ProgrammingError, msg, 503 self.cx.setlimit, cat, 0) 504 505 def test_connection_init_bad_isolation_level(self): 506 msg = ( 507 "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or " 508 "'EXCLUSIVE'" 509 ) 510 levels = ( 511 "BOGUS", 512 " ", 513 "DEFERRE", 514 "IMMEDIAT", 515 "EXCLUSIV", 516 "DEFERREDS", 517 "IMMEDIATES", 518 "EXCLUSIVES", 519 ) 520 for level in levels: 521 with self.subTest(level=level): 522 with self.assertRaisesRegex(ValueError, msg): 523 memory_database(isolation_level=level) 524 with memory_database() as cx: 525 with self.assertRaisesRegex(ValueError, msg): 526 cx.isolation_level = level 527 # Check that the default level is not changed 528 self.assertEqual(cx.isolation_level, "") 529 530 def test_connection_init_good_isolation_levels(self): 531 for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None): 532 with self.subTest(level=level): 533 with memory_database(isolation_level=level) as cx: 534 self.assertEqual(cx.isolation_level, level) 535 with memory_database() as cx: 536 self.assertEqual(cx.isolation_level, "") 537 cx.isolation_level = level 538 self.assertEqual(cx.isolation_level, level) 539 540 def test_connection_reinit(self): 541 db = ":memory:" 542 cx = sqlite.connect(db) 543 cx.text_factory = bytes 544 cx.row_factory = sqlite.Row 545 cu = cx.cursor() 546 cu.execute("create table foo (bar)") 547 cu.executemany("insert into foo (bar) values (?)", 548 ((str(v),) for v in range(4))) 549 cu.execute("select bar from foo") 550 551 rows = [r for r in cu.fetchmany(2)] 552 self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows)) 553 self.assertEqual([r[0] for r in rows], [b"0", b"1"]) 554 555 cx.__init__(db) 556 cx.execute("create table foo (bar)") 557 cx.executemany("insert into foo (bar) values (?)", 558 ((v,) for v in ("a", "b", "c", "d"))) 559 560 # This uses the old database, old row factory, but new text factory 561 rows = [r for r in cu.fetchall()] 562 self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows)) 563 self.assertEqual([r[0] for r in rows], ["2", "3"]) 564 565 def test_connection_bad_reinit(self): 566 cx = sqlite.connect(":memory:") 567 with cx: 568 cx.execute("create table t(t)") 569 with temp_dir() as db: 570 self.assertRaisesRegex(sqlite.OperationalError, 571 "unable to open database file", 572 cx.__init__, db) 573 self.assertRaisesRegex(sqlite.ProgrammingError, 574 "Base Connection.__init__ not called", 575 cx.executemany, "insert into t values(?)", 576 ((v,) for v in range(3))) 577 578 579class UninitialisedConnectionTests(unittest.TestCase): 580 def setUp(self): 581 self.cx = sqlite.Connection.__new__(sqlite.Connection) 582 583 def test_uninit_operations(self): 584 funcs = ( 585 lambda: self.cx.isolation_level, 586 lambda: self.cx.total_changes, 587 lambda: self.cx.in_transaction, 588 lambda: self.cx.iterdump(), 589 lambda: self.cx.cursor(), 590 lambda: self.cx.close(), 591 ) 592 for func in funcs: 593 with self.subTest(func=func): 594 self.assertRaisesRegex(sqlite.ProgrammingError, 595 "Base Connection.__init__ not called", 596 func) 597 598 599@unittest.skipUnless(hasattr(sqlite.Connection, "serialize"), 600 "Needs SQLite serialize API") 601class SerializeTests(unittest.TestCase): 602 def test_serialize_deserialize(self): 603 with memory_database() as cx: 604 with cx: 605 cx.execute("create table t(t)") 606 data = cx.serialize() 607 608 # Remove test table, verify that it was removed. 609 with cx: 610 cx.execute("drop table t") 611 regex = "no such table" 612 with self.assertRaisesRegex(sqlite.OperationalError, regex): 613 cx.execute("select t from t") 614 615 # Deserialize and verify that test table is restored. 616 cx.deserialize(data) 617 cx.execute("select t from t") 618 619 def test_deserialize_wrong_args(self): 620 dataset = ( 621 (BufferError, memoryview(b"blob")[::2]), 622 (TypeError, []), 623 (TypeError, 1), 624 (TypeError, None), 625 ) 626 for exc, arg in dataset: 627 with self.subTest(exc=exc, arg=arg): 628 with memory_database() as cx: 629 self.assertRaises(exc, cx.deserialize, arg) 630 631 def test_deserialize_corrupt_database(self): 632 with memory_database() as cx: 633 regex = "file is not a database" 634 with self.assertRaisesRegex(sqlite.DatabaseError, regex): 635 cx.deserialize(b"\0\1\3") 636 # SQLite does not generate an error until you try to query the 637 # deserialized database. 638 cx.execute("create table fail(f)") 639 640 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 641 @bigmemtest(size=2**63, memuse=3, dry_run=False) 642 def test_deserialize_too_much_data_64bit(self): 643 with memory_database() as cx: 644 with self.assertRaisesRegex(OverflowError, "'data' is too large"): 645 cx.deserialize(b"b" * size) 646 647 648class OpenTests(unittest.TestCase): 649 _sql = "create table test(id integer)" 650 651 def test_open_with_path_like_object(self): 652 """ Checks that we can successfully connect to a database using an object that 653 is PathLike, i.e. has __fspath__(). """ 654 path = FakePath(TESTFN) 655 self.addCleanup(unlink, path) 656 self.assertFalse(os.path.exists(path)) 657 with contextlib.closing(sqlite.connect(path)) as cx: 658 self.assertTrue(os.path.exists(path)) 659 cx.execute(self._sql) 660 661 @unittest.skipIf(sys.platform == "win32", "skipped on Windows") 662 @unittest.skipIf(sys.platform == "darwin", "skipped on macOS") 663 @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI") 664 @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths") 665 def test_open_with_undecodable_path(self): 666 path = TESTFN_UNDECODABLE 667 self.addCleanup(unlink, path) 668 self.assertFalse(os.path.exists(path)) 669 with contextlib.closing(sqlite.connect(path)) as cx: 670 self.assertTrue(os.path.exists(path)) 671 cx.execute(self._sql) 672 673 def test_open_uri(self): 674 path = TESTFN 675 self.addCleanup(unlink, path) 676 uri = "file:" + urllib.parse.quote(os.fsencode(path)) 677 self.assertFalse(os.path.exists(path)) 678 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 679 self.assertTrue(os.path.exists(path)) 680 cx.execute(self._sql) 681 682 def test_open_unquoted_uri(self): 683 path = TESTFN 684 self.addCleanup(unlink, path) 685 uri = "file:" + path 686 self.assertFalse(os.path.exists(path)) 687 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 688 self.assertTrue(os.path.exists(path)) 689 cx.execute(self._sql) 690 691 def test_open_uri_readonly(self): 692 path = TESTFN 693 self.addCleanup(unlink, path) 694 uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro" 695 self.assertFalse(os.path.exists(path)) 696 # Cannot create new DB 697 with self.assertRaises(sqlite.OperationalError): 698 sqlite.connect(uri, uri=True) 699 self.assertFalse(os.path.exists(path)) 700 sqlite.connect(path).close() 701 self.assertTrue(os.path.exists(path)) 702 # Cannot modify new DB 703 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 704 with self.assertRaises(sqlite.OperationalError): 705 cx.execute(self._sql) 706 707 @unittest.skipIf(sys.platform == "win32", "skipped on Windows") 708 @unittest.skipIf(sys.platform == "darwin", "skipped on macOS") 709 @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI") 710 @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths") 711 def test_open_undecodable_uri(self): 712 path = TESTFN_UNDECODABLE 713 self.addCleanup(unlink, path) 714 uri = "file:" + urllib.parse.quote(path) 715 self.assertFalse(os.path.exists(path)) 716 with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 717 self.assertTrue(os.path.exists(path)) 718 cx.execute(self._sql) 719 720 def test_factory_database_arg(self): 721 def factory(database, *args, **kwargs): 722 nonlocal database_arg 723 database_arg = database 724 return sqlite.Connection(":memory:", *args, **kwargs) 725 726 for database in (TESTFN, os.fsencode(TESTFN), 727 FakePath(TESTFN), FakePath(os.fsencode(TESTFN))): 728 database_arg = None 729 sqlite.connect(database, factory=factory).close() 730 self.assertEqual(database_arg, database) 731 732 def test_database_keyword(self): 733 with contextlib.closing(sqlite.connect(database=":memory:")) as cx: 734 self.assertEqual(type(cx), sqlite.Connection) 735 736 737class CursorTests(unittest.TestCase): 738 def setUp(self): 739 self.cx = sqlite.connect(":memory:") 740 self.cu = self.cx.cursor() 741 self.cu.execute( 742 "create table test(id integer primary key, name text, " 743 "income number, unique_test text unique)" 744 ) 745 self.cu.execute("insert into test(name) values (?)", ("foo",)) 746 747 def tearDown(self): 748 self.cu.close() 749 self.cx.close() 750 751 def test_execute_no_args(self): 752 self.cu.execute("delete from test") 753 754 def test_execute_illegal_sql(self): 755 with self.assertRaises(sqlite.OperationalError): 756 self.cu.execute("select asdf") 757 758 def test_execute_multiple_statements(self): 759 msg = "You can only execute one statement at a time" 760 dataset = ( 761 "select 1; select 2", 762 "select 1; // c++ comments are not allowed", 763 "select 1; *not a comment", 764 "select 1; -*not a comment", 765 "select 1; /* */ a", 766 "select 1; /**/a", 767 "select 1; -", 768 "select 1; /", 769 "select 1; -\n- select 2", 770 """select 1; 771 -- comment 772 select 2 773 """, 774 ) 775 for query in dataset: 776 with self.subTest(query=query): 777 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 778 self.cu.execute(query) 779 780 def test_execute_with_appended_comments(self): 781 dataset = ( 782 "select 1; -- foo bar", 783 "select 1; --", 784 "select 1; /*", # Unclosed comments ending in \0 are skipped. 785 """ 786 select 5+4; 787 788 /* 789 foo 790 */ 791 """, 792 ) 793 for query in dataset: 794 with self.subTest(query=query): 795 self.cu.execute(query) 796 797 def test_execute_wrong_sql_arg(self): 798 with self.assertRaises(TypeError): 799 self.cu.execute(42) 800 801 def test_execute_arg_int(self): 802 self.cu.execute("insert into test(id) values (?)", (42,)) 803 804 def test_execute_arg_float(self): 805 self.cu.execute("insert into test(income) values (?)", (2500.32,)) 806 807 def test_execute_arg_string(self): 808 self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 809 810 def test_execute_arg_string_with_zero_byte(self): 811 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 812 813 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 814 row = self.cu.fetchone() 815 self.assertEqual(row[0], "Hu\x00go") 816 817 def test_execute_non_iterable(self): 818 with self.assertRaises(sqlite.ProgrammingError) as cm: 819 self.cu.execute("insert into test(id) values (?)", 42) 820 self.assertEqual(str(cm.exception), 'parameters are of unsupported type') 821 822 def test_execute_wrong_no_of_args1(self): 823 # too many parameters 824 with self.assertRaises(sqlite.ProgrammingError): 825 self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 826 827 def test_execute_wrong_no_of_args2(self): 828 # too little parameters 829 with self.assertRaises(sqlite.ProgrammingError): 830 self.cu.execute("insert into test(id) values (?)") 831 832 def test_execute_wrong_no_of_args3(self): 833 # no parameters, parameters are needed 834 with self.assertRaises(sqlite.ProgrammingError): 835 self.cu.execute("insert into test(id) values (?)") 836 837 def test_execute_param_list(self): 838 self.cu.execute("insert into test(name) values ('foo')") 839 self.cu.execute("select name from test where name=?", ["foo"]) 840 row = self.cu.fetchone() 841 self.assertEqual(row[0], "foo") 842 843 def test_execute_param_sequence(self): 844 class L: 845 def __len__(self): 846 return 1 847 def __getitem__(self, x): 848 assert x == 0 849 return "foo" 850 851 self.cu.execute("insert into test(name) values ('foo')") 852 self.cu.execute("select name from test where name=?", L()) 853 row = self.cu.fetchone() 854 self.assertEqual(row[0], "foo") 855 856 def test_execute_param_sequence_bad_len(self): 857 # Issue41662: Error in __len__() was overridden with ProgrammingError. 858 class L: 859 def __len__(self): 860 1/0 861 def __getitem__(slf, x): 862 raise AssertionError 863 864 self.cu.execute("insert into test(name) values ('foo')") 865 with self.assertRaises(ZeroDivisionError): 866 self.cu.execute("select name from test where name=?", L()) 867 868 def test_execute_too_many_params(self): 869 category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER 870 msg = "too many SQL variables" 871 with cx_limit(self.cx, category=category, limit=1): 872 self.cu.execute("select * from test where id=?", (1,)) 873 with self.assertRaisesRegex(sqlite.OperationalError, msg): 874 self.cu.execute("select * from test where id!=? and id!=?", 875 (1, 2)) 876 877 def test_execute_dict_mapping(self): 878 self.cu.execute("insert into test(name) values ('foo')") 879 self.cu.execute("select name from test where name=:name", {"name": "foo"}) 880 row = self.cu.fetchone() 881 self.assertEqual(row[0], "foo") 882 883 def test_execute_dict_mapping_mapping(self): 884 class D(dict): 885 def __missing__(self, key): 886 return "foo" 887 888 self.cu.execute("insert into test(name) values ('foo')") 889 self.cu.execute("select name from test where name=:name", D()) 890 row = self.cu.fetchone() 891 self.assertEqual(row[0], "foo") 892 893 def test_execute_dict_mapping_too_little_args(self): 894 self.cu.execute("insert into test(name) values ('foo')") 895 with self.assertRaises(sqlite.ProgrammingError): 896 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 897 898 def test_execute_dict_mapping_no_args(self): 899 self.cu.execute("insert into test(name) values ('foo')") 900 with self.assertRaises(sqlite.ProgrammingError): 901 self.cu.execute("select name from test where name=:name") 902 903 def test_execute_dict_mapping_unnamed(self): 904 self.cu.execute("insert into test(name) values ('foo')") 905 with self.assertRaises(sqlite.ProgrammingError): 906 self.cu.execute("select name from test where name=?", {"name": "foo"}) 907 908 def test_close(self): 909 self.cu.close() 910 911 def test_rowcount_execute(self): 912 self.cu.execute("delete from test") 913 self.cu.execute("insert into test(name) values ('foo')") 914 self.cu.execute("insert into test(name) values ('foo')") 915 self.cu.execute("update test set name='bar'") 916 self.assertEqual(self.cu.rowcount, 2) 917 918 def test_rowcount_select(self): 919 """ 920 pysqlite does not know the rowcount of SELECT statements, because we 921 don't fetch all rows after executing the select statement. The rowcount 922 has thus to be -1. 923 """ 924 self.cu.execute("select 5 union select 6") 925 self.assertEqual(self.cu.rowcount, -1) 926 927 def test_rowcount_executemany(self): 928 self.cu.execute("delete from test") 929 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 930 self.assertEqual(self.cu.rowcount, 3) 931 932 @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0), 933 "Requires SQLite 3.35.0 or newer") 934 def test_rowcount_update_returning(self): 935 # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries 936 self.cu.execute("update test set name='bar' where name='foo' returning 1") 937 self.assertEqual(self.cu.fetchone()[0], 1) 938 self.assertEqual(self.cu.rowcount, 1) 939 940 def test_rowcount_prefixed_with_comment(self): 941 # gh-79579: rowcount is updated even if query is prefixed with comments 942 self.cu.execute(""" 943 -- foo 944 insert into test(name) values ('foo'), ('foo') 945 """) 946 self.assertEqual(self.cu.rowcount, 2) 947 self.cu.execute(""" 948 /* -- messy *r /* /* ** *- *-- 949 */ 950 /* one more */ insert into test(name) values ('messy') 951 """) 952 self.assertEqual(self.cu.rowcount, 1) 953 self.cu.execute("/* bar */ update test set name='bar' where name='foo'") 954 self.assertEqual(self.cu.rowcount, 3) 955 956 def test_rowcount_vaccuum(self): 957 data = ((1,), (2,), (3,)) 958 self.cu.executemany("insert into test(income) values(?)", data) 959 self.assertEqual(self.cu.rowcount, 3) 960 self.cx.commit() 961 self.cu.execute("vacuum") 962 self.assertEqual(self.cu.rowcount, -1) 963 964 def test_total_changes(self): 965 self.cu.execute("insert into test(name) values ('foo')") 966 self.cu.execute("insert into test(name) values ('foo')") 967 self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') 968 969 # Checks for executemany: 970 # Sequences are required by the DB-API, iterators 971 # enhancements in pysqlite. 972 973 def test_execute_many_sequence(self): 974 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 975 976 def test_execute_many_iterator(self): 977 class MyIter: 978 def __init__(self): 979 self.value = 5 980 981 def __iter__(self): 982 return self 983 984 def __next__(self): 985 if self.value == 10: 986 raise StopIteration 987 else: 988 self.value += 1 989 return (self.value,) 990 991 self.cu.executemany("insert into test(income) values (?)", MyIter()) 992 993 def test_execute_many_generator(self): 994 def mygen(): 995 for i in range(5): 996 yield (i,) 997 998 self.cu.executemany("insert into test(income) values (?)", mygen()) 999 1000 def test_execute_many_wrong_sql_arg(self): 1001 with self.assertRaises(TypeError): 1002 self.cu.executemany(42, [(3,)]) 1003 1004 def test_execute_many_select(self): 1005 with self.assertRaises(sqlite.ProgrammingError): 1006 self.cu.executemany("select ?", [(3,)]) 1007 1008 def test_execute_many_not_iterable(self): 1009 with self.assertRaises(TypeError): 1010 self.cu.executemany("insert into test(income) values (?)", 42) 1011 1012 def test_fetch_iter(self): 1013 # Optional DB-API extension. 1014 self.cu.execute("delete from test") 1015 self.cu.execute("insert into test(id) values (?)", (5,)) 1016 self.cu.execute("insert into test(id) values (?)", (6,)) 1017 self.cu.execute("select id from test order by id") 1018 lst = [] 1019 for row in self.cu: 1020 lst.append(row[0]) 1021 self.assertEqual(lst[0], 5) 1022 self.assertEqual(lst[1], 6) 1023 1024 def test_fetchone(self): 1025 self.cu.execute("select name from test") 1026 row = self.cu.fetchone() 1027 self.assertEqual(row[0], "foo") 1028 row = self.cu.fetchone() 1029 self.assertEqual(row, None) 1030 1031 def test_fetchone_no_statement(self): 1032 cur = self.cx.cursor() 1033 row = cur.fetchone() 1034 self.assertEqual(row, None) 1035 1036 def test_array_size(self): 1037 # must default to 1 1038 self.assertEqual(self.cu.arraysize, 1) 1039 1040 # now set to 2 1041 self.cu.arraysize = 2 1042 1043 # now make the query return 3 rows 1044 self.cu.execute("delete from test") 1045 self.cu.execute("insert into test(name) values ('A')") 1046 self.cu.execute("insert into test(name) values ('B')") 1047 self.cu.execute("insert into test(name) values ('C')") 1048 self.cu.execute("select name from test") 1049 res = self.cu.fetchmany() 1050 1051 self.assertEqual(len(res), 2) 1052 1053 def test_fetchmany(self): 1054 self.cu.execute("select name from test") 1055 res = self.cu.fetchmany(100) 1056 self.assertEqual(len(res), 1) 1057 res = self.cu.fetchmany(100) 1058 self.assertEqual(res, []) 1059 1060 def test_fetchmany_kw_arg(self): 1061 """Checks if fetchmany works with keyword arguments""" 1062 self.cu.execute("select name from test") 1063 res = self.cu.fetchmany(size=100) 1064 self.assertEqual(len(res), 1) 1065 1066 def test_fetchall(self): 1067 self.cu.execute("select name from test") 1068 res = self.cu.fetchall() 1069 self.assertEqual(len(res), 1) 1070 res = self.cu.fetchall() 1071 self.assertEqual(res, []) 1072 1073 def test_setinputsizes(self): 1074 self.cu.setinputsizes([3, 4, 5]) 1075 1076 def test_setoutputsize(self): 1077 self.cu.setoutputsize(5, 0) 1078 1079 def test_setoutputsize_no_column(self): 1080 self.cu.setoutputsize(42) 1081 1082 def test_cursor_connection(self): 1083 # Optional DB-API extension. 1084 self.assertEqual(self.cu.connection, self.cx) 1085 1086 def test_wrong_cursor_callable(self): 1087 with self.assertRaises(TypeError): 1088 def f(): pass 1089 cur = self.cx.cursor(f) 1090 1091 def test_cursor_wrong_class(self): 1092 class Foo: pass 1093 foo = Foo() 1094 with self.assertRaises(TypeError): 1095 cur = sqlite.Cursor(foo) 1096 1097 def test_last_row_id_on_replace(self): 1098 """ 1099 INSERT OR REPLACE and REPLACE INTO should produce the same behavior. 1100 """ 1101 sql = '{} INTO test(id, unique_test) VALUES (?, ?)' 1102 for statement in ('INSERT OR REPLACE', 'REPLACE'): 1103 with self.subTest(statement=statement): 1104 self.cu.execute(sql.format(statement), (1, 'foo')) 1105 self.assertEqual(self.cu.lastrowid, 1) 1106 1107 def test_last_row_id_on_ignore(self): 1108 self.cu.execute( 1109 "insert or ignore into test(unique_test) values (?)", 1110 ('test',)) 1111 self.assertEqual(self.cu.lastrowid, 2) 1112 self.cu.execute( 1113 "insert or ignore into test(unique_test) values (?)", 1114 ('test',)) 1115 self.assertEqual(self.cu.lastrowid, 2) 1116 1117 def test_last_row_id_insert_o_r(self): 1118 results = [] 1119 for statement in ('FAIL', 'ABORT', 'ROLLBACK'): 1120 sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' 1121 with self.subTest(statement='INSERT OR {}'.format(statement)): 1122 self.cu.execute(sql.format(statement), (statement,)) 1123 results.append((statement, self.cu.lastrowid)) 1124 with self.assertRaises(sqlite.IntegrityError): 1125 self.cu.execute(sql.format(statement), (statement,)) 1126 results.append((statement, self.cu.lastrowid)) 1127 expected = [ 1128 ('FAIL', 2), ('FAIL', 2), 1129 ('ABORT', 3), ('ABORT', 3), 1130 ('ROLLBACK', 4), ('ROLLBACK', 4), 1131 ] 1132 self.assertEqual(results, expected) 1133 1134 def test_column_count(self): 1135 # Check that column count is updated correctly for cached statements 1136 select = "select * from test" 1137 res = self.cu.execute(select) 1138 old_count = len(res.description) 1139 # Add a new column and execute the cached select query again 1140 self.cu.execute("alter table test add newcol") 1141 res = self.cu.execute(select) 1142 new_count = len(res.description) 1143 self.assertEqual(new_count - old_count, 1) 1144 1145 def test_same_query_in_multiple_cursors(self): 1146 cursors = [self.cx.execute("select 1") for _ in range(3)] 1147 for cu in cursors: 1148 self.assertEqual(cu.fetchall(), [(1,)]) 1149 1150 1151class BlobTests(unittest.TestCase): 1152 def setUp(self): 1153 self.cx = sqlite.connect(":memory:") 1154 self.cx.execute("create table test(b blob)") 1155 self.data = b"this blob data string is exactly fifty bytes long!" 1156 self.cx.execute("insert into test(b) values (?)", (self.data,)) 1157 self.blob = self.cx.blobopen("test", "b", 1) 1158 1159 def tearDown(self): 1160 self.blob.close() 1161 self.cx.close() 1162 1163 def test_blob_is_a_blob(self): 1164 self.assertIsInstance(self.blob, sqlite.Blob) 1165 1166 def test_blob_seek_and_tell(self): 1167 self.blob.seek(10) 1168 self.assertEqual(self.blob.tell(), 10) 1169 1170 self.blob.seek(10, SEEK_SET) 1171 self.assertEqual(self.blob.tell(), 10) 1172 1173 self.blob.seek(10, SEEK_CUR) 1174 self.assertEqual(self.blob.tell(), 20) 1175 1176 self.blob.seek(-10, SEEK_END) 1177 self.assertEqual(self.blob.tell(), 40) 1178 1179 def test_blob_seek_error(self): 1180 msg_oor = "offset out of blob range" 1181 msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END" 1182 msg_of = "seek offset results in overflow" 1183 1184 dataset = ( 1185 (ValueError, msg_oor, lambda: self.blob.seek(1000)), 1186 (ValueError, msg_oor, lambda: self.blob.seek(-10)), 1187 (ValueError, msg_orig, lambda: self.blob.seek(10, -1)), 1188 (ValueError, msg_orig, lambda: self.blob.seek(10, 3)), 1189 ) 1190 for exc, msg, fn in dataset: 1191 with self.subTest(exc=exc, msg=msg, fn=fn): 1192 self.assertRaisesRegex(exc, msg, fn) 1193 1194 # Force overflow errors 1195 self.blob.seek(1, SEEK_SET) 1196 with self.assertRaisesRegex(OverflowError, msg_of): 1197 self.blob.seek(INT_MAX, SEEK_CUR) 1198 with self.assertRaisesRegex(OverflowError, msg_of): 1199 self.blob.seek(INT_MAX, SEEK_END) 1200 1201 def test_blob_read(self): 1202 buf = self.blob.read() 1203 self.assertEqual(buf, self.data) 1204 1205 def test_blob_read_oversized(self): 1206 buf = self.blob.read(len(self.data) * 2) 1207 self.assertEqual(buf, self.data) 1208 1209 def test_blob_read_advance_offset(self): 1210 n = 10 1211 buf = self.blob.read(n) 1212 self.assertEqual(buf, self.data[:n]) 1213 self.assertEqual(self.blob.tell(), n) 1214 1215 def test_blob_read_at_offset(self): 1216 self.blob.seek(10) 1217 self.assertEqual(self.blob.read(10), self.data[10:20]) 1218 1219 def test_blob_read_error_row_changed(self): 1220 self.cx.execute("update test set b='aaaa' where rowid=1") 1221 with self.assertRaises(sqlite.OperationalError): 1222 self.blob.read() 1223 1224 def test_blob_write(self): 1225 new_data = b"new data".ljust(50) 1226 self.blob.write(new_data) 1227 row = self.cx.execute("select b from test").fetchone() 1228 self.assertEqual(row[0], new_data) 1229 1230 def test_blob_write_at_offset(self): 1231 new_data = b"c" * 25 1232 self.blob.seek(25) 1233 self.blob.write(new_data) 1234 row = self.cx.execute("select b from test").fetchone() 1235 self.assertEqual(row[0], self.data[:25] + new_data) 1236 1237 def test_blob_write_advance_offset(self): 1238 self.blob.write(b"d"*10) 1239 self.assertEqual(self.blob.tell(), 10) 1240 1241 def test_blob_write_error_length(self): 1242 with self.assertRaisesRegex(ValueError, "data longer than blob"): 1243 self.blob.write(b"a" * 1000) 1244 1245 self.blob.seek(0, SEEK_SET) 1246 n = len(self.blob) 1247 self.blob.write(b"a" * (n-1)) 1248 self.blob.write(b"a") 1249 with self.assertRaisesRegex(ValueError, "data longer than blob"): 1250 self.blob.write(b"a") 1251 1252 def test_blob_write_error_row_changed(self): 1253 self.cx.execute("update test set b='aaaa' where rowid=1") 1254 with self.assertRaises(sqlite.OperationalError): 1255 self.blob.write(b"aaa") 1256 1257 def test_blob_write_error_readonly(self): 1258 ro_blob = self.cx.blobopen("test", "b", 1, readonly=True) 1259 with self.assertRaisesRegex(sqlite.OperationalError, "readonly"): 1260 ro_blob.write(b"aaa") 1261 ro_blob.close() 1262 1263 def test_blob_open_error(self): 1264 dataset = ( 1265 (("test", "b", 1), {"name": "notexisting"}), 1266 (("notexisting", "b", 1), {}), 1267 (("test", "notexisting", 1), {}), 1268 (("test", "b", 2), {}), 1269 ) 1270 regex = "no such" 1271 for args, kwds in dataset: 1272 with self.subTest(args=args, kwds=kwds): 1273 with self.assertRaisesRegex(sqlite.OperationalError, regex): 1274 self.cx.blobopen(*args, **kwds) 1275 1276 def test_blob_length(self): 1277 self.assertEqual(len(self.blob), 50) 1278 1279 def test_blob_get_item(self): 1280 self.assertEqual(self.blob[5], ord("b")) 1281 self.assertEqual(self.blob[6], ord("l")) 1282 self.assertEqual(self.blob[7], ord("o")) 1283 self.assertEqual(self.blob[8], ord("b")) 1284 self.assertEqual(self.blob[-1], ord("!")) 1285 1286 def test_blob_set_item(self): 1287 self.blob[0] = ord("b") 1288 expected = b"b" + self.data[1:] 1289 actual = self.cx.execute("select b from test").fetchone()[0] 1290 self.assertEqual(actual, expected) 1291 1292 def test_blob_set_item_with_offset(self): 1293 self.blob.seek(0, SEEK_END) 1294 self.assertEqual(self.blob.read(), b"") # verify that we're at EOB 1295 self.blob[0] = ord("T") 1296 self.blob[-1] = ord(".") 1297 self.blob.seek(0, SEEK_SET) 1298 expected = b"This blob data string is exactly fifty bytes long." 1299 self.assertEqual(self.blob.read(), expected) 1300 1301 def test_blob_set_slice_buffer_object(self): 1302 from array import array 1303 self.blob[0:5] = memoryview(b"12345") 1304 self.assertEqual(self.blob[0:5], b"12345") 1305 1306 self.blob[0:5] = bytearray(b"23456") 1307 self.assertEqual(self.blob[0:5], b"23456") 1308 1309 self.blob[0:5] = array("b", [1, 2, 3, 4, 5]) 1310 self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05") 1311 1312 def test_blob_set_item_negative_index(self): 1313 self.blob[-1] = 255 1314 self.assertEqual(self.blob[-1], 255) 1315 1316 def test_blob_get_slice(self): 1317 self.assertEqual(self.blob[5:14], b"blob data") 1318 1319 def test_blob_get_empty_slice(self): 1320 self.assertEqual(self.blob[5:5], b"") 1321 1322 def test_blob_get_slice_negative_index(self): 1323 self.assertEqual(self.blob[5:-5], self.data[5:-5]) 1324 1325 def test_blob_get_slice_with_skip(self): 1326 self.assertEqual(self.blob[0:10:2], b"ti lb") 1327 1328 def test_blob_set_slice(self): 1329 self.blob[0:5] = b"12345" 1330 expected = b"12345" + self.data[5:] 1331 actual = self.cx.execute("select b from test").fetchone()[0] 1332 self.assertEqual(actual, expected) 1333 1334 def test_blob_set_empty_slice(self): 1335 self.blob[0:0] = b"" 1336 self.assertEqual(self.blob[:], self.data) 1337 1338 def test_blob_set_slice_with_skip(self): 1339 self.blob[0:10:2] = b"12345" 1340 actual = self.cx.execute("select b from test").fetchone()[0] 1341 expected = b"1h2s3b4o5 " + self.data[10:] 1342 self.assertEqual(actual, expected) 1343 1344 def test_blob_mapping_invalid_index_type(self): 1345 msg = "indices must be integers" 1346 with self.assertRaisesRegex(TypeError, msg): 1347 self.blob[5:5.5] 1348 with self.assertRaisesRegex(TypeError, msg): 1349 self.blob[1.5] 1350 with self.assertRaisesRegex(TypeError, msg): 1351 self.blob["a"] = b"b" 1352 1353 def test_blob_get_item_error(self): 1354 dataset = [len(self.blob), 105, -105] 1355 for idx in dataset: 1356 with self.subTest(idx=idx): 1357 with self.assertRaisesRegex(IndexError, "index out of range"): 1358 self.blob[idx] 1359 with self.assertRaisesRegex(IndexError, "cannot fit 'int'"): 1360 self.blob[ULLONG_MAX] 1361 1362 # Provoke read error 1363 self.cx.execute("update test set b='aaaa' where rowid=1") 1364 with self.assertRaises(sqlite.OperationalError): 1365 self.blob[0] 1366 1367 def test_blob_set_item_error(self): 1368 with self.assertRaisesRegex(TypeError, "cannot be interpreted"): 1369 self.blob[0] = b"multiple" 1370 with self.assertRaisesRegex(TypeError, "cannot be interpreted"): 1371 self.blob[0] = b"1" 1372 with self.assertRaisesRegex(TypeError, "cannot be interpreted"): 1373 self.blob[0] = bytearray(b"1") 1374 with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"): 1375 del self.blob[0] 1376 with self.assertRaisesRegex(IndexError, "Blob index out of range"): 1377 self.blob[1000] = 0 1378 with self.assertRaisesRegex(ValueError, "must be in range"): 1379 self.blob[0] = -1 1380 with self.assertRaisesRegex(ValueError, "must be in range"): 1381 self.blob[0] = 256 1382 # Overflow errors are overridden with ValueError 1383 with self.assertRaisesRegex(ValueError, "must be in range"): 1384 self.blob[0] = 2**65 1385 1386 def test_blob_set_slice_error(self): 1387 with self.assertRaisesRegex(IndexError, "wrong size"): 1388 self.blob[5:10] = b"a" 1389 with self.assertRaisesRegex(IndexError, "wrong size"): 1390 self.blob[5:10] = b"a" * 1000 1391 with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"): 1392 del self.blob[5:10] 1393 with self.assertRaisesRegex(ValueError, "step cannot be zero"): 1394 self.blob[5:10:0] = b"12345" 1395 with self.assertRaises(BufferError): 1396 self.blob[5:10] = memoryview(b"abcde")[::2] 1397 1398 def test_blob_sequence_not_supported(self): 1399 with self.assertRaisesRegex(TypeError, "unsupported operand"): 1400 self.blob + self.blob 1401 with self.assertRaisesRegex(TypeError, "unsupported operand"): 1402 self.blob * 5 1403 with self.assertRaisesRegex(TypeError, "is not iterable"): 1404 b"a" in self.blob 1405 1406 def test_blob_context_manager(self): 1407 data = b"a" * 50 1408 with self.cx.blobopen("test", "b", 1) as blob: 1409 blob.write(data) 1410 actual = self.cx.execute("select b from test").fetchone()[0] 1411 self.assertEqual(actual, data) 1412 1413 # Check that __exit__ closed the blob 1414 with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"): 1415 blob.read() 1416 1417 def test_blob_context_manager_reraise_exceptions(self): 1418 class DummyException(Exception): 1419 pass 1420 with self.assertRaisesRegex(DummyException, "reraised"): 1421 with self.cx.blobopen("test", "b", 1) as blob: 1422 raise DummyException("reraised") 1423 1424 1425 def test_blob_closed(self): 1426 with memory_database() as cx: 1427 cx.execute("create table test(b blob)") 1428 cx.execute("insert into test values (zeroblob(100))") 1429 blob = cx.blobopen("test", "b", 1) 1430 blob.close() 1431 1432 msg = "Cannot operate on a closed blob" 1433 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1434 blob.read() 1435 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1436 blob.write(b"") 1437 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1438 blob.seek(0) 1439 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1440 blob.tell() 1441 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1442 blob.__enter__() 1443 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1444 blob.__exit__(None, None, None) 1445 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1446 len(blob) 1447 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1448 blob[0] 1449 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1450 blob[0:1] 1451 with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 1452 blob[0] = b"" 1453 1454 def test_blob_closed_db_read(self): 1455 with memory_database() as cx: 1456 cx.execute("create table test(b blob)") 1457 cx.execute("insert into test(b) values (zeroblob(100))") 1458 blob = cx.blobopen("test", "b", 1) 1459 cx.close() 1460 self.assertRaisesRegex(sqlite.ProgrammingError, 1461 "Cannot operate on a closed database", 1462 blob.read) 1463 1464 def test_blob_32bit_rowid(self): 1465 # gh-100370: we should not get an OverflowError for 32-bit rowids 1466 with memory_database() as cx: 1467 rowid = 2**32 1468 cx.execute("create table t(t blob)") 1469 cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,)) 1470 cx.blobopen('t', 't', rowid) 1471 1472 1473@threading_helper.requires_working_threading() 1474class ThreadTests(unittest.TestCase): 1475 def setUp(self): 1476 self.con = sqlite.connect(":memory:") 1477 self.cur = self.con.cursor() 1478 self.cur.execute("create table test(name text, b blob)") 1479 self.cur.execute("insert into test values('blob', zeroblob(1))") 1480 1481 def tearDown(self): 1482 self.cur.close() 1483 self.con.close() 1484 1485 @threading_helper.reap_threads 1486 def _run_test(self, fn, *args, **kwds): 1487 def run(err): 1488 try: 1489 fn(*args, **kwds) 1490 err.append("did not raise ProgrammingError") 1491 except sqlite.ProgrammingError: 1492 pass 1493 except: 1494 err.append("raised wrong exception") 1495 1496 err = [] 1497 t = threading.Thread(target=run, kwargs={"err": err}) 1498 t.start() 1499 t.join() 1500 if err: 1501 self.fail("\n".join(err)) 1502 1503 def test_check_connection_thread(self): 1504 fns = [ 1505 lambda: self.con.cursor(), 1506 lambda: self.con.commit(), 1507 lambda: self.con.rollback(), 1508 lambda: self.con.close(), 1509 lambda: self.con.set_trace_callback(None), 1510 lambda: self.con.set_authorizer(None), 1511 lambda: self.con.create_collation("foo", None), 1512 lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1), 1513 lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH), 1514 lambda: self.con.blobopen("test", "b", 1), 1515 ] 1516 if hasattr(sqlite.Connection, "serialize"): 1517 fns.append(lambda: self.con.serialize()) 1518 fns.append(lambda: self.con.deserialize(b"")) 1519 if sqlite.sqlite_version_info >= (3, 25, 0): 1520 fns.append(lambda: self.con.create_window_function("foo", 0, None)) 1521 1522 for fn in fns: 1523 with self.subTest(fn=fn): 1524 self._run_test(fn) 1525 1526 def test_check_cursor_thread(self): 1527 fns = [ 1528 lambda: self.cur.execute("insert into test(name) values('a')"), 1529 lambda: self.cur.close(), 1530 lambda: self.cur.execute("select name from test"), 1531 lambda: self.cur.fetchone(), 1532 ] 1533 for fn in fns: 1534 with self.subTest(fn=fn): 1535 self._run_test(fn) 1536 1537 1538 @threading_helper.reap_threads 1539 def test_dont_check_same_thread(self): 1540 def run(con, err): 1541 try: 1542 con.execute("select 1") 1543 except sqlite.Error: 1544 err.append("multi-threading not allowed") 1545 1546 con = sqlite.connect(":memory:", check_same_thread=False) 1547 err = [] 1548 t = threading.Thread(target=run, kwargs={"con": con, "err": err}) 1549 t.start() 1550 t.join() 1551 self.assertEqual(len(err), 0, "\n".join(err)) 1552 1553 1554class ConstructorTests(unittest.TestCase): 1555 def test_date(self): 1556 d = sqlite.Date(2004, 10, 28) 1557 1558 def test_time(self): 1559 t = sqlite.Time(12, 39, 35) 1560 1561 def test_timestamp(self): 1562 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 1563 1564 def test_date_from_ticks(self): 1565 d = sqlite.DateFromTicks(42) 1566 1567 def test_time_from_ticks(self): 1568 t = sqlite.TimeFromTicks(42) 1569 1570 def test_timestamp_from_ticks(self): 1571 ts = sqlite.TimestampFromTicks(42) 1572 1573 def test_binary(self): 1574 b = sqlite.Binary(b"\0'") 1575 1576class ExtensionTests(unittest.TestCase): 1577 def test_script_string_sql(self): 1578 con = sqlite.connect(":memory:") 1579 cur = con.cursor() 1580 cur.executescript(""" 1581 -- bla bla 1582 /* a stupid comment */ 1583 create table a(i); 1584 insert into a(i) values (5); 1585 """) 1586 cur.execute("select i from a") 1587 res = cur.fetchone()[0] 1588 self.assertEqual(res, 5) 1589 1590 def test_script_syntax_error(self): 1591 con = sqlite.connect(":memory:") 1592 cur = con.cursor() 1593 with self.assertRaises(sqlite.OperationalError): 1594 cur.executescript("create table test(x); asdf; create table test2(x)") 1595 1596 def test_script_error_normal(self): 1597 con = sqlite.connect(":memory:") 1598 cur = con.cursor() 1599 with self.assertRaises(sqlite.OperationalError): 1600 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 1601 1602 def test_cursor_executescript_as_bytes(self): 1603 con = sqlite.connect(":memory:") 1604 cur = con.cursor() 1605 with self.assertRaises(TypeError): 1606 cur.executescript(b"create table test(foo); insert into test(foo) values (5);") 1607 1608 def test_cursor_executescript_with_null_characters(self): 1609 con = sqlite.connect(":memory:") 1610 cur = con.cursor() 1611 with self.assertRaises(ValueError): 1612 cur.executescript(""" 1613 create table a(i);\0 1614 insert into a(i) values (5); 1615 """) 1616 1617 def test_cursor_executescript_with_surrogates(self): 1618 con = sqlite.connect(":memory:") 1619 cur = con.cursor() 1620 with self.assertRaises(UnicodeEncodeError): 1621 cur.executescript(""" 1622 create table a(s); 1623 insert into a(s) values ('\ud8ff'); 1624 """) 1625 1626 def test_cursor_executescript_too_large_script(self): 1627 msg = "query string is too large" 1628 with memory_database() as cx, cx_limit(cx) as lim: 1629 cx.executescript("select 'almost too large'".ljust(lim)) 1630 with self.assertRaisesRegex(sqlite.DataError, msg): 1631 cx.executescript("select 'too large'".ljust(lim+1)) 1632 1633 def test_cursor_executescript_tx_control(self): 1634 con = sqlite.connect(":memory:") 1635 con.execute("begin") 1636 self.assertTrue(con.in_transaction) 1637 con.executescript("select 1") 1638 self.assertFalse(con.in_transaction) 1639 1640 def test_connection_execute(self): 1641 con = sqlite.connect(":memory:") 1642 result = con.execute("select 5").fetchone()[0] 1643 self.assertEqual(result, 5, "Basic test of Connection.execute") 1644 1645 def test_connection_executemany(self): 1646 con = sqlite.connect(":memory:") 1647 con.execute("create table test(foo)") 1648 con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 1649 result = con.execute("select foo from test order by foo").fetchall() 1650 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 1651 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 1652 1653 def test_connection_executescript(self): 1654 con = sqlite.connect(":memory:") 1655 con.executescript("create table test(foo); insert into test(foo) values (5);") 1656 result = con.execute("select foo from test").fetchone()[0] 1657 self.assertEqual(result, 5, "Basic test of Connection.executescript") 1658 1659class ClosedConTests(unittest.TestCase): 1660 def test_closed_con_cursor(self): 1661 con = sqlite.connect(":memory:") 1662 con.close() 1663 with self.assertRaises(sqlite.ProgrammingError): 1664 cur = con.cursor() 1665 1666 def test_closed_con_commit(self): 1667 con = sqlite.connect(":memory:") 1668 con.close() 1669 with self.assertRaises(sqlite.ProgrammingError): 1670 con.commit() 1671 1672 def test_closed_con_rollback(self): 1673 con = sqlite.connect(":memory:") 1674 con.close() 1675 with self.assertRaises(sqlite.ProgrammingError): 1676 con.rollback() 1677 1678 def test_closed_cur_execute(self): 1679 con = sqlite.connect(":memory:") 1680 cur = con.cursor() 1681 con.close() 1682 with self.assertRaises(sqlite.ProgrammingError): 1683 cur.execute("select 4") 1684 1685 def test_closed_create_function(self): 1686 con = sqlite.connect(":memory:") 1687 con.close() 1688 def f(x): return 17 1689 with self.assertRaises(sqlite.ProgrammingError): 1690 con.create_function("foo", 1, f) 1691 1692 def test_closed_create_aggregate(self): 1693 con = sqlite.connect(":memory:") 1694 con.close() 1695 class Agg: 1696 def __init__(self): 1697 pass 1698 def step(self, x): 1699 pass 1700 def finalize(self): 1701 return 17 1702 with self.assertRaises(sqlite.ProgrammingError): 1703 con.create_aggregate("foo", 1, Agg) 1704 1705 def test_closed_set_authorizer(self): 1706 con = sqlite.connect(":memory:") 1707 con.close() 1708 def authorizer(*args): 1709 return sqlite.DENY 1710 with self.assertRaises(sqlite.ProgrammingError): 1711 con.set_authorizer(authorizer) 1712 1713 def test_closed_set_progress_callback(self): 1714 con = sqlite.connect(":memory:") 1715 con.close() 1716 def progress(): pass 1717 with self.assertRaises(sqlite.ProgrammingError): 1718 con.set_progress_handler(progress, 100) 1719 1720 def test_closed_call(self): 1721 con = sqlite.connect(":memory:") 1722 con.close() 1723 with self.assertRaises(sqlite.ProgrammingError): 1724 con() 1725 1726class ClosedCurTests(unittest.TestCase): 1727 def test_closed(self): 1728 con = sqlite.connect(":memory:") 1729 cur = con.cursor() 1730 cur.close() 1731 1732 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 1733 if method_name in ("execute", "executescript"): 1734 params = ("select 4 union select 5",) 1735 elif method_name == "executemany": 1736 params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 1737 else: 1738 params = [] 1739 1740 with self.assertRaises(sqlite.ProgrammingError): 1741 method = getattr(cur, method_name) 1742 method(*params) 1743 1744 1745class SqliteOnConflictTests(unittest.TestCase): 1746 """ 1747 Tests for SQLite's "insert on conflict" feature. 1748 1749 See https://www.sqlite.org/lang_conflict.html for details. 1750 """ 1751 1752 def setUp(self): 1753 self.cx = sqlite.connect(":memory:") 1754 self.cu = self.cx.cursor() 1755 self.cu.execute(""" 1756 CREATE TABLE test( 1757 id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE 1758 ); 1759 """) 1760 1761 def tearDown(self): 1762 self.cu.close() 1763 self.cx.close() 1764 1765 def test_on_conflict_rollback_with_explicit_transaction(self): 1766 self.cx.isolation_level = None # autocommit mode 1767 self.cu = self.cx.cursor() 1768 # Start an explicit transaction. 1769 self.cu.execute("BEGIN") 1770 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 1771 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 1772 with self.assertRaises(sqlite.IntegrityError): 1773 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 1774 # Use connection to commit. 1775 self.cx.commit() 1776 self.cu.execute("SELECT name, unique_name from test") 1777 # Transaction should have rolled back and nothing should be in table. 1778 self.assertEqual(self.cu.fetchall(), []) 1779 1780 def test_on_conflict_abort_raises_with_explicit_transactions(self): 1781 # Abort cancels the current sql statement but doesn't change anything 1782 # about the current transaction. 1783 self.cx.isolation_level = None # autocommit mode 1784 self.cu = self.cx.cursor() 1785 # Start an explicit transaction. 1786 self.cu.execute("BEGIN") 1787 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 1788 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 1789 with self.assertRaises(sqlite.IntegrityError): 1790 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 1791 self.cx.commit() 1792 self.cu.execute("SELECT name, unique_name FROM test") 1793 # Expect the first two inserts to work, third to do nothing. 1794 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 1795 1796 def test_on_conflict_rollback_without_transaction(self): 1797 # Start of implicit transaction 1798 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 1799 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 1800 with self.assertRaises(sqlite.IntegrityError): 1801 self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 1802 self.cu.execute("SELECT name, unique_name FROM test") 1803 # Implicit transaction is rolled back on error. 1804 self.assertEqual(self.cu.fetchall(), []) 1805 1806 def test_on_conflict_abort_raises_without_transactions(self): 1807 # Abort cancels the current sql statement but doesn't change anything 1808 # about the current transaction. 1809 self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 1810 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 1811 with self.assertRaises(sqlite.IntegrityError): 1812 self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 1813 # Make sure all other values were inserted. 1814 self.cu.execute("SELECT name, unique_name FROM test") 1815 self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 1816 1817 def test_on_conflict_fail(self): 1818 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 1819 with self.assertRaises(sqlite.IntegrityError): 1820 self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 1821 self.assertEqual(self.cu.fetchall(), []) 1822 1823 def test_on_conflict_ignore(self): 1824 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 1825 # Nothing should happen. 1826 self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 1827 self.cu.execute("SELECT unique_name FROM test") 1828 self.assertEqual(self.cu.fetchall(), [('foo',)]) 1829 1830 def test_on_conflict_replace(self): 1831 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") 1832 # There shouldn't be an IntegrityError exception. 1833 self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") 1834 self.cu.execute("SELECT name, unique_name FROM test") 1835 self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) 1836 1837 1838@requires_subprocess() 1839class MultiprocessTests(unittest.TestCase): 1840 CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000. # Defaults to 30 ms 1841 1842 def tearDown(self): 1843 unlink(TESTFN) 1844 1845 def test_ctx_mgr_rollback_if_commit_failed(self): 1846 # bpo-27334: ctx manager does not rollback if commit fails 1847 SCRIPT = f"""if 1: 1848 import sqlite3 1849 def wait(): 1850 print("started") 1851 assert "database is locked" in input() 1852 1853 cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT}) 1854 cx.create_function("wait", 0, wait) 1855 with cx: 1856 cx.execute("create table t(t)") 1857 try: 1858 # execute two transactions; both will try to lock the db 1859 cx.executescript(''' 1860 -- start a transaction and wait for parent 1861 begin transaction; 1862 select * from t; 1863 select wait(); 1864 rollback; 1865 1866 -- start a new transaction; would fail if parent holds lock 1867 begin transaction; 1868 select * from t; 1869 rollback; 1870 ''') 1871 finally: 1872 cx.close() 1873 """ 1874 1875 # spawn child process 1876 proc = subprocess.Popen( 1877 [sys.executable, "-c", SCRIPT], 1878 encoding="utf-8", 1879 bufsize=0, 1880 stdin=subprocess.PIPE, 1881 stdout=subprocess.PIPE, 1882 ) 1883 self.addCleanup(proc.communicate) 1884 1885 # wait for child process to start 1886 self.assertEqual("started", proc.stdout.readline().strip()) 1887 1888 cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT) 1889 try: # context manager should correctly release the db lock 1890 with cx: 1891 cx.execute("insert into t values('test')") 1892 except sqlite.OperationalError as exc: 1893 proc.stdin.write(str(exc)) 1894 else: 1895 proc.stdin.write("no error") 1896 finally: 1897 cx.close() 1898 1899 # terminate child process 1900 self.assertIsNone(proc.returncode) 1901 try: 1902 proc.communicate(input="end", timeout=SHORT_TIMEOUT) 1903 except subprocess.TimeoutExpired: 1904 proc.kill() 1905 proc.communicate() 1906 raise 1907 self.assertEqual(proc.returncode, 0) 1908 1909 1910if __name__ == "__main__": 1911 unittest.main() 1912