1# pysqlite2/test/regression.py: pysqlite regression tests 2# 3# Copyright (C) 2006-2010 Gerhard Häring <[email protected]> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import datetime 24import unittest 25import sqlite3 as sqlite 26import weakref 27import functools 28 29from test import support 30from unittest.mock import patch 31from test.test_sqlite3.test_dbapi import memory_database, cx_limit 32 33 34class RegressionTests(unittest.TestCase): 35 def setUp(self): 36 self.con = sqlite.connect(":memory:") 37 38 def tearDown(self): 39 self.con.close() 40 41 def test_pragma_user_version(self): 42 # This used to crash pysqlite because this pragma command returns NULL for the column name 43 cur = self.con.cursor() 44 cur.execute("pragma user_version") 45 46 def test_pragma_schema_version(self): 47 # This still crashed pysqlite <= 2.2.1 48 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 49 try: 50 cur = self.con.cursor() 51 cur.execute("pragma schema_version") 52 finally: 53 cur.close() 54 con.close() 55 56 def test_statement_reset(self): 57 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 58 # reset before a rollback, but only those that are still in the 59 # statement cache. The others are not accessible from the connection object. 60 con = sqlite.connect(":memory:", cached_statements=5) 61 cursors = [con.cursor() for x in range(5)] 62 cursors[0].execute("create table test(x)") 63 for i in range(10): 64 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 65 66 for i in range(5): 67 cursors[i].execute(" " * i + "select x from test") 68 69 con.rollback() 70 71 def test_column_name_with_spaces(self): 72 cur = self.con.cursor() 73 cur.execute('select 1 as "foo bar [datetime]"') 74 self.assertEqual(cur.description[0][0], "foo bar [datetime]") 75 76 cur.execute('select 1 as "foo baz"') 77 self.assertEqual(cur.description[0][0], "foo baz") 78 79 def test_statement_finalization_on_close_db(self): 80 # pysqlite versions <= 2.3.3 only finalized statements in the statement 81 # cache when closing the database. statements that were still 82 # referenced in cursors weren't closed and could provoke " 83 # "OperationalError: Unable to close due to unfinalised statements". 84 con = sqlite.connect(":memory:") 85 cursors = [] 86 # default statement cache size is 100 87 for i in range(105): 88 cur = con.cursor() 89 cursors.append(cur) 90 cur.execute("select 1 x union select " + str(i)) 91 con.close() 92 93 def test_on_conflict_rollback(self): 94 con = sqlite.connect(":memory:") 95 con.execute("create table foo(x, unique(x) on conflict rollback)") 96 con.execute("insert into foo(x) values (1)") 97 try: 98 con.execute("insert into foo(x) values (1)") 99 except sqlite.DatabaseError: 100 pass 101 con.execute("insert into foo(x) values (2)") 102 try: 103 con.commit() 104 except sqlite.OperationalError: 105 self.fail("pysqlite knew nothing about the implicit ROLLBACK") 106 107 def test_workaround_for_buggy_sqlite_transfer_bindings(self): 108 """ 109 pysqlite would crash with older SQLite versions unless 110 a workaround is implemented. 111 """ 112 self.con.execute("create table foo(bar)") 113 self.con.execute("drop table foo") 114 self.con.execute("create table foo(bar)") 115 116 def test_empty_statement(self): 117 """ 118 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 119 for "no-operation" statements 120 """ 121 self.con.execute("") 122 123 def test_type_map_usage(self): 124 """ 125 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 126 a statement. This test exhibits the problem. 127 """ 128 SELECT = "select * from foo" 129 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 130 cur = con.cursor() 131 cur.execute("create table foo(bar timestamp)") 132 cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 133 cur.execute(SELECT) 134 cur.execute("drop table foo") 135 cur.execute("create table foo(bar integer)") 136 cur.execute("insert into foo(bar) values (5)") 137 cur.execute(SELECT) 138 139 def test_bind_mutating_list(self): 140 # Issue41662: Crash when mutate a list of parameters during iteration. 141 class X: 142 def __conform__(self, protocol): 143 parameters.clear() 144 return "..." 145 parameters = [X(), 0] 146 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 147 con.execute("create table foo(bar X, baz integer)") 148 # Should not crash 149 with self.assertRaises(IndexError): 150 con.execute("insert into foo(bar, baz) values (?, ?)", parameters) 151 152 def test_error_msg_decode_error(self): 153 # When porting the module to Python 3.0, the error message about 154 # decoding errors disappeared. This verifies they're back again. 155 with self.assertRaises(sqlite.OperationalError) as cm: 156 self.con.execute("select 'xxx' || ? || 'yyy' colname", 157 (bytes(bytearray([250])),)).fetchone() 158 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 159 self.assertIn(msg, str(cm.exception)) 160 161 def test_register_adapter(self): 162 """ 163 See issue 3312. 164 """ 165 self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 166 167 def test_set_isolation_level(self): 168 # See issue 27881. 169 class CustomStr(str): 170 def upper(self): 171 return None 172 def __del__(self): 173 con.isolation_level = "" 174 175 con = sqlite.connect(":memory:") 176 con.isolation_level = None 177 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 178 with self.subTest(level=level): 179 con.isolation_level = level 180 con.isolation_level = level.lower() 181 con.isolation_level = level.capitalize() 182 con.isolation_level = CustomStr(level) 183 184 # setting isolation_level failure should not alter previous state 185 con.isolation_level = None 186 con.isolation_level = "DEFERRED" 187 pairs = [ 188 (1, TypeError), (b'', TypeError), ("abc", ValueError), 189 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 190 ] 191 for value, exc in pairs: 192 with self.subTest(level=value): 193 with self.assertRaises(exc): 194 con.isolation_level = value 195 self.assertEqual(con.isolation_level, "DEFERRED") 196 197 def test_cursor_constructor_call_check(self): 198 """ 199 Verifies that cursor methods check whether base class __init__ was 200 called. 201 """ 202 class Cursor(sqlite.Cursor): 203 def __init__(self, con): 204 pass 205 206 con = sqlite.connect(":memory:") 207 cur = Cursor(con) 208 with self.assertRaises(sqlite.ProgrammingError): 209 cur.execute("select 4+5").fetchall() 210 with self.assertRaisesRegex(sqlite.ProgrammingError, 211 r'^Base Cursor\.__init__ not called\.$'): 212 cur.close() 213 214 def test_str_subclass(self): 215 """ 216 The Python 3.0 port of the module didn't cope with values of subclasses of str. 217 """ 218 class MyStr(str): pass 219 self.con.execute("select ?", (MyStr("abc"),)) 220 221 def test_connection_constructor_call_check(self): 222 """ 223 Verifies that connection methods check whether base class __init__ was 224 called. 225 """ 226 class Connection(sqlite.Connection): 227 def __init__(self, name): 228 pass 229 230 con = Connection(":memory:") 231 with self.assertRaises(sqlite.ProgrammingError): 232 cur = con.cursor() 233 234 def test_auto_commit(self): 235 """ 236 Verifies that creating a connection in autocommit mode works. 237 2.5.3 introduced a regression so that these could no longer 238 be created. 239 """ 240 con = sqlite.connect(":memory:", isolation_level=None) 241 242 def test_pragma_autocommit(self): 243 """ 244 Verifies that running a PRAGMA statement that does an autocommit does 245 work. This did not work in 2.5.3/2.5.4. 246 """ 247 cur = self.con.cursor() 248 cur.execute("create table foo(bar)") 249 cur.execute("insert into foo(bar) values (5)") 250 251 cur.execute("pragma page_size") 252 row = cur.fetchone() 253 254 def test_connection_call(self): 255 """ 256 Call a connection with a non-string SQL request: check error handling 257 of the statement constructor. 258 """ 259 self.assertRaises(TypeError, self.con, b"select 1") 260 261 def test_collation(self): 262 def collation_cb(a, b): 263 return 1 264 self.assertRaises(UnicodeEncodeError, self.con.create_collation, 265 # Lone surrogate cannot be encoded to the default encoding (utf8) 266 "\uDC80", collation_cb) 267 268 def test_recursive_cursor_use(self): 269 """ 270 http://bugs.python.org/issue10811 271 272 Recursively using a cursor, such as when reusing it from a generator led to segfaults. 273 Now we catch recursive cursor usage and raise a ProgrammingError. 274 """ 275 con = sqlite.connect(":memory:") 276 277 cur = con.cursor() 278 cur.execute("create table a (bar)") 279 cur.execute("create table b (baz)") 280 281 def foo(): 282 cur.execute("insert into a (bar) values (?)", (1,)) 283 yield 1 284 285 with self.assertRaises(sqlite.ProgrammingError): 286 cur.executemany("insert into b (baz) values (?)", 287 ((i,) for i in foo())) 288 289 def test_convert_timestamp_microsecond_padding(self): 290 """ 291 http://bugs.python.org/issue14720 292 293 The microsecond parsing of convert_timestamp() should pad with zeros, 294 since the microsecond string "456" actually represents "456000". 295 """ 296 297 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 298 cur = con.cursor() 299 cur.execute("CREATE TABLE t (x TIMESTAMP)") 300 301 # Microseconds should be 456000 302 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 303 304 # Microseconds should be truncated to 123456 305 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 306 307 cur.execute("SELECT * FROM t") 308 values = [x[0] for x in cur.fetchall()] 309 310 self.assertEqual(values, [ 311 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 312 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 313 ]) 314 315 def test_invalid_isolation_level_type(self): 316 # isolation level is a string, not an integer 317 self.assertRaises(TypeError, 318 sqlite.connect, ":memory:", isolation_level=123) 319 320 321 def test_null_character(self): 322 # Issue #21147 323 cur = self.con.cursor() 324 queries = ["\0select 1", "select 1\0"] 325 for query in queries: 326 with self.subTest(query=query): 327 self.assertRaisesRegex(sqlite.ProgrammingError, "null char", 328 self.con.execute, query) 329 with self.subTest(query=query): 330 self.assertRaisesRegex(sqlite.ProgrammingError, "null char", 331 cur.execute, query) 332 333 def test_surrogates(self): 334 con = sqlite.connect(":memory:") 335 self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'") 336 self.assertRaises(UnicodeEncodeError, con, "select '\udcff'") 337 cur = con.cursor() 338 self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'") 339 self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'") 340 341 def test_large_sql(self): 342 msg = "query string is too large" 343 with memory_database() as cx, cx_limit(cx) as lim: 344 cu = cx.cursor() 345 346 cx("select 1".ljust(lim)) 347 # use a different SQL statement; don't reuse from the LRU cache 348 cu.execute("select 2".ljust(lim)) 349 350 sql = "select 3".ljust(lim+1) 351 self.assertRaisesRegex(sqlite.DataError, msg, cx, sql) 352 self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql) 353 354 def test_commit_cursor_reset(self): 355 """ 356 Connection.commit() did reset cursors, which made sqlite3 357 to return rows multiple times when fetched from cursors 358 after commit. See issues 10513 and 23129 for details. 359 """ 360 con = sqlite.connect(":memory:") 361 con.executescript(""" 362 create table t(c); 363 create table t2(c); 364 insert into t values(0); 365 insert into t values(1); 366 insert into t values(2); 367 """) 368 369 self.assertEqual(con.isolation_level, "") 370 371 counter = 0 372 for i, row in enumerate(con.execute("select c from t")): 373 with self.subTest(i=i, row=row): 374 con.execute("insert into t2(c) values (?)", (i,)) 375 con.commit() 376 if counter == 0: 377 self.assertEqual(row[0], 0) 378 elif counter == 1: 379 self.assertEqual(row[0], 1) 380 elif counter == 2: 381 self.assertEqual(row[0], 2) 382 counter += 1 383 self.assertEqual(counter, 3, "should have returned exactly three rows") 384 385 def test_bpo31770(self): 386 """ 387 The interpreter shouldn't crash in case Cursor.__init__() is called 388 more than once. 389 """ 390 def callback(*args): 391 pass 392 con = sqlite.connect(":memory:") 393 cur = sqlite.Cursor(con) 394 ref = weakref.ref(cur, callback) 395 cur.__init__(con) 396 del cur 397 # The interpreter shouldn't crash when ref is collected. 398 del ref 399 support.gc_collect() 400 401 def test_del_isolation_level_segfault(self): 402 with self.assertRaises(AttributeError): 403 del self.con.isolation_level 404 405 def test_bpo37347(self): 406 class Printer: 407 def log(self, *args): 408 return sqlite.SQLITE_OK 409 410 for method in [self.con.set_trace_callback, 411 functools.partial(self.con.set_progress_handler, n=1), 412 self.con.set_authorizer]: 413 printer_instance = Printer() 414 method(printer_instance.log) 415 method(printer_instance.log) 416 self.con.execute("select 1") # trigger seg fault 417 method(None) 418 419 def test_return_empty_bytestring(self): 420 cur = self.con.execute("select X''") 421 val = cur.fetchone()[0] 422 self.assertEqual(val, b'') 423 424 def test_table_lock_cursor_replace_stmt(self): 425 with memory_database() as con: 426 cur = con.cursor() 427 cur.execute("create table t(t)") 428 cur.executemany("insert into t values(?)", 429 ((v,) for v in range(5))) 430 con.commit() 431 cur.execute("select t from t") 432 cur.execute("drop table t") 433 con.commit() 434 435 def test_table_lock_cursor_dealloc(self): 436 with memory_database() as con: 437 con.execute("create table t(t)") 438 con.executemany("insert into t values(?)", 439 ((v,) for v in range(5))) 440 con.commit() 441 cur = con.execute("select t from t") 442 del cur 443 con.execute("drop table t") 444 con.commit() 445 446 def test_table_lock_cursor_non_readonly_select(self): 447 with memory_database() as con: 448 con.execute("create table t(t)") 449 con.executemany("insert into t values(?)", 450 ((v,) for v in range(5))) 451 con.commit() 452 def dup(v): 453 con.execute("insert into t values(?)", (v,)) 454 return 455 con.create_function("dup", 1, dup) 456 cur = con.execute("select dup(t) from t") 457 del cur 458 con.execute("drop table t") 459 con.commit() 460 461 def test_executescript_step_through_select(self): 462 with memory_database() as con: 463 values = [(v,) for v in range(5)] 464 with con: 465 con.execute("create table t(t)") 466 con.executemany("insert into t values(?)", values) 467 steps = [] 468 con.create_function("step", 1, lambda x: steps.append((x,))) 469 con.executescript("select step(t) from t") 470 self.assertEqual(steps, values) 471 472 def test_custom_cursor_object_crash_gh_99886(self): 473 # This test segfaults on GH-99886 474 class MyCursor(sqlite.Cursor): 475 def __init__(self, *args, **kwargs): 476 super().__init__(*args, **kwargs) 477 # this can go before or after the super call; doesn't matter 478 self.some_attr = None 479 480 with memory_database() as con: 481 cur = con.cursor(MyCursor) 482 cur.close() 483 del cur 484 485class RecursiveUseOfCursors(unittest.TestCase): 486 # GH-80254: sqlite3 should not segfault for recursive use of cursors. 487 msg = "Recursive use of cursors not allowed" 488 489 def setUp(self): 490 self.con = sqlite.connect(":memory:", 491 detect_types=sqlite.PARSE_COLNAMES) 492 self.cur = self.con.cursor() 493 self.cur.execute("create table test(x foo)") 494 self.cur.executemany("insert into test(x) values (?)", 495 [("foo",), ("bar",)]) 496 497 def tearDown(self): 498 self.cur.close() 499 self.con.close() 500 501 def test_recursive_cursor_init(self): 502 conv = lambda x: self.cur.__init__(self.con) 503 with patch.dict(sqlite.converters, {"INIT": conv}): 504 self.cur.execute(f'select x as "x [INIT]", x from test') 505 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 506 self.cur.fetchall) 507 508 def test_recursive_cursor_close(self): 509 conv = lambda x: self.cur.close() 510 with patch.dict(sqlite.converters, {"CLOSE": conv}): 511 self.cur.execute(f'select x as "x [CLOSE]", x from test') 512 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 513 self.cur.fetchall) 514 515 def test_recursive_cursor_iter(self): 516 conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None) 517 with patch.dict(sqlite.converters, {"ITER": conv}): 518 self.cur.execute(f'select x as "x [ITER]", x from test') 519 self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 520 self.cur.fetchall) 521 522 523if __name__ == "__main__": 524 unittest.main() 525