1# pysqlite2/test/regression.py: pysqlite regression tests
2#
3# Copyright (C) 2006-2010 Gerhard Häring <[email protected]>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import datetime
24import unittest
25import sqlite3 as sqlite
26import weakref
27import functools
28
29from test import support
30from unittest.mock import patch
31from test.test_sqlite3.test_dbapi import memory_database, cx_limit
32
33
34class RegressionTests(unittest.TestCase):
35    def setUp(self):
36        self.con = sqlite.connect(":memory:")
37
38    def tearDown(self):
39        self.con.close()
40
41    def test_pragma_user_version(self):
42        # This used to crash pysqlite because this pragma command returns NULL for the column name
43        cur = self.con.cursor()
44        cur.execute("pragma user_version")
45
46    def test_pragma_schema_version(self):
47        # This still crashed pysqlite <= 2.2.1
48        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
49        try:
50            cur = self.con.cursor()
51            cur.execute("pragma schema_version")
52        finally:
53            cur.close()
54            con.close()
55
56    def test_statement_reset(self):
57        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
58        # reset before a rollback, but only those that are still in the
59        # statement cache. The others are not accessible from the connection object.
60        con = sqlite.connect(":memory:", cached_statements=5)
61        cursors = [con.cursor() for x in range(5)]
62        cursors[0].execute("create table test(x)")
63        for i in range(10):
64            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
65
66        for i in range(5):
67            cursors[i].execute(" " * i + "select x from test")
68
69        con.rollback()
70
71    def test_column_name_with_spaces(self):
72        cur = self.con.cursor()
73        cur.execute('select 1 as "foo bar [datetime]"')
74        self.assertEqual(cur.description[0][0], "foo bar [datetime]")
75
76        cur.execute('select 1 as "foo baz"')
77        self.assertEqual(cur.description[0][0], "foo baz")
78
79    def test_statement_finalization_on_close_db(self):
80        # pysqlite versions <= 2.3.3 only finalized statements in the statement
81        # cache when closing the database. statements that were still
82        # referenced in cursors weren't closed and could provoke "
83        # "OperationalError: Unable to close due to unfinalised statements".
84        con = sqlite.connect(":memory:")
85        cursors = []
86        # default statement cache size is 100
87        for i in range(105):
88            cur = con.cursor()
89            cursors.append(cur)
90            cur.execute("select 1 x union select " + str(i))
91        con.close()
92
93    def test_on_conflict_rollback(self):
94        con = sqlite.connect(":memory:")
95        con.execute("create table foo(x, unique(x) on conflict rollback)")
96        con.execute("insert into foo(x) values (1)")
97        try:
98            con.execute("insert into foo(x) values (1)")
99        except sqlite.DatabaseError:
100            pass
101        con.execute("insert into foo(x) values (2)")
102        try:
103            con.commit()
104        except sqlite.OperationalError:
105            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
106
107    def test_workaround_for_buggy_sqlite_transfer_bindings(self):
108        """
109        pysqlite would crash with older SQLite versions unless
110        a workaround is implemented.
111        """
112        self.con.execute("create table foo(bar)")
113        self.con.execute("drop table foo")
114        self.con.execute("create table foo(bar)")
115
116    def test_empty_statement(self):
117        """
118        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
119        for "no-operation" statements
120        """
121        self.con.execute("")
122
123    def test_type_map_usage(self):
124        """
125        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
126        a statement. This test exhibits the problem.
127        """
128        SELECT = "select * from foo"
129        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
130        cur = con.cursor()
131        cur.execute("create table foo(bar timestamp)")
132        cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
133        cur.execute(SELECT)
134        cur.execute("drop table foo")
135        cur.execute("create table foo(bar integer)")
136        cur.execute("insert into foo(bar) values (5)")
137        cur.execute(SELECT)
138
139    def test_bind_mutating_list(self):
140        # Issue41662: Crash when mutate a list of parameters during iteration.
141        class X:
142            def __conform__(self, protocol):
143                parameters.clear()
144                return "..."
145        parameters = [X(), 0]
146        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
147        con.execute("create table foo(bar X, baz integer)")
148        # Should not crash
149        with self.assertRaises(IndexError):
150            con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
151
152    def test_error_msg_decode_error(self):
153        # When porting the module to Python 3.0, the error message about
154        # decoding errors disappeared. This verifies they're back again.
155        with self.assertRaises(sqlite.OperationalError) as cm:
156            self.con.execute("select 'xxx' || ? || 'yyy' colname",
157                             (bytes(bytearray([250])),)).fetchone()
158        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
159        self.assertIn(msg, str(cm.exception))
160
161    def test_register_adapter(self):
162        """
163        See issue 3312.
164        """
165        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
166
167    def test_set_isolation_level(self):
168        # See issue 27881.
169        class CustomStr(str):
170            def upper(self):
171                return None
172            def __del__(self):
173                con.isolation_level = ""
174
175        con = sqlite.connect(":memory:")
176        con.isolation_level = None
177        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
178            with self.subTest(level=level):
179                con.isolation_level = level
180                con.isolation_level = level.lower()
181                con.isolation_level = level.capitalize()
182                con.isolation_level = CustomStr(level)
183
184        # setting isolation_level failure should not alter previous state
185        con.isolation_level = None
186        con.isolation_level = "DEFERRED"
187        pairs = [
188            (1, TypeError), (b'', TypeError), ("abc", ValueError),
189            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
190        ]
191        for value, exc in pairs:
192            with self.subTest(level=value):
193                with self.assertRaises(exc):
194                    con.isolation_level = value
195                self.assertEqual(con.isolation_level, "DEFERRED")
196
197    def test_cursor_constructor_call_check(self):
198        """
199        Verifies that cursor methods check whether base class __init__ was
200        called.
201        """
202        class Cursor(sqlite.Cursor):
203            def __init__(self, con):
204                pass
205
206        con = sqlite.connect(":memory:")
207        cur = Cursor(con)
208        with self.assertRaises(sqlite.ProgrammingError):
209            cur.execute("select 4+5").fetchall()
210        with self.assertRaisesRegex(sqlite.ProgrammingError,
211                                    r'^Base Cursor\.__init__ not called\.$'):
212            cur.close()
213
214    def test_str_subclass(self):
215        """
216        The Python 3.0 port of the module didn't cope with values of subclasses of str.
217        """
218        class MyStr(str): pass
219        self.con.execute("select ?", (MyStr("abc"),))
220
221    def test_connection_constructor_call_check(self):
222        """
223        Verifies that connection methods check whether base class __init__ was
224        called.
225        """
226        class Connection(sqlite.Connection):
227            def __init__(self, name):
228                pass
229
230        con = Connection(":memory:")
231        with self.assertRaises(sqlite.ProgrammingError):
232            cur = con.cursor()
233
234    def test_auto_commit(self):
235        """
236        Verifies that creating a connection in autocommit mode works.
237        2.5.3 introduced a regression so that these could no longer
238        be created.
239        """
240        con = sqlite.connect(":memory:", isolation_level=None)
241
242    def test_pragma_autocommit(self):
243        """
244        Verifies that running a PRAGMA statement that does an autocommit does
245        work. This did not work in 2.5.3/2.5.4.
246        """
247        cur = self.con.cursor()
248        cur.execute("create table foo(bar)")
249        cur.execute("insert into foo(bar) values (5)")
250
251        cur.execute("pragma page_size")
252        row = cur.fetchone()
253
254    def test_connection_call(self):
255        """
256        Call a connection with a non-string SQL request: check error handling
257        of the statement constructor.
258        """
259        self.assertRaises(TypeError, self.con, b"select 1")
260
261    def test_collation(self):
262        def collation_cb(a, b):
263            return 1
264        self.assertRaises(UnicodeEncodeError, self.con.create_collation,
265            # Lone surrogate cannot be encoded to the default encoding (utf8)
266            "\uDC80", collation_cb)
267
268    def test_recursive_cursor_use(self):
269        """
270        http://bugs.python.org/issue10811
271
272        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
273        Now we catch recursive cursor usage and raise a ProgrammingError.
274        """
275        con = sqlite.connect(":memory:")
276
277        cur = con.cursor()
278        cur.execute("create table a (bar)")
279        cur.execute("create table b (baz)")
280
281        def foo():
282            cur.execute("insert into a (bar) values (?)", (1,))
283            yield 1
284
285        with self.assertRaises(sqlite.ProgrammingError):
286            cur.executemany("insert into b (baz) values (?)",
287                            ((i,) for i in foo()))
288
289    def test_convert_timestamp_microsecond_padding(self):
290        """
291        http://bugs.python.org/issue14720
292
293        The microsecond parsing of convert_timestamp() should pad with zeros,
294        since the microsecond string "456" actually represents "456000".
295        """
296
297        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
298        cur = con.cursor()
299        cur.execute("CREATE TABLE t (x TIMESTAMP)")
300
301        # Microseconds should be 456000
302        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
303
304        # Microseconds should be truncated to 123456
305        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
306
307        cur.execute("SELECT * FROM t")
308        values = [x[0] for x in cur.fetchall()]
309
310        self.assertEqual(values, [
311            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
312            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
313        ])
314
315    def test_invalid_isolation_level_type(self):
316        # isolation level is a string, not an integer
317        self.assertRaises(TypeError,
318                          sqlite.connect, ":memory:", isolation_level=123)
319
320
321    def test_null_character(self):
322        # Issue #21147
323        cur = self.con.cursor()
324        queries = ["\0select 1", "select 1\0"]
325        for query in queries:
326            with self.subTest(query=query):
327                self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
328                                       self.con.execute, query)
329            with self.subTest(query=query):
330                self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
331                                       cur.execute, query)
332
333    def test_surrogates(self):
334        con = sqlite.connect(":memory:")
335        self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
336        self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
337        cur = con.cursor()
338        self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
339        self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
340
341    def test_large_sql(self):
342        msg = "query string is too large"
343        with memory_database() as cx, cx_limit(cx) as lim:
344            cu = cx.cursor()
345
346            cx("select 1".ljust(lim))
347            # use a different SQL statement; don't reuse from the LRU cache
348            cu.execute("select 2".ljust(lim))
349
350            sql = "select 3".ljust(lim+1)
351            self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
352            self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
353
354    def test_commit_cursor_reset(self):
355        """
356        Connection.commit() did reset cursors, which made sqlite3
357        to return rows multiple times when fetched from cursors
358        after commit. See issues 10513 and 23129 for details.
359        """
360        con = sqlite.connect(":memory:")
361        con.executescript("""
362        create table t(c);
363        create table t2(c);
364        insert into t values(0);
365        insert into t values(1);
366        insert into t values(2);
367        """)
368
369        self.assertEqual(con.isolation_level, "")
370
371        counter = 0
372        for i, row in enumerate(con.execute("select c from t")):
373            with self.subTest(i=i, row=row):
374                con.execute("insert into t2(c) values (?)", (i,))
375                con.commit()
376                if counter == 0:
377                    self.assertEqual(row[0], 0)
378                elif counter == 1:
379                    self.assertEqual(row[0], 1)
380                elif counter == 2:
381                    self.assertEqual(row[0], 2)
382                counter += 1
383        self.assertEqual(counter, 3, "should have returned exactly three rows")
384
385    def test_bpo31770(self):
386        """
387        The interpreter shouldn't crash in case Cursor.__init__() is called
388        more than once.
389        """
390        def callback(*args):
391            pass
392        con = sqlite.connect(":memory:")
393        cur = sqlite.Cursor(con)
394        ref = weakref.ref(cur, callback)
395        cur.__init__(con)
396        del cur
397        # The interpreter shouldn't crash when ref is collected.
398        del ref
399        support.gc_collect()
400
401    def test_del_isolation_level_segfault(self):
402        with self.assertRaises(AttributeError):
403            del self.con.isolation_level
404
405    def test_bpo37347(self):
406        class Printer:
407            def log(self, *args):
408                return sqlite.SQLITE_OK
409
410        for method in [self.con.set_trace_callback,
411                       functools.partial(self.con.set_progress_handler, n=1),
412                       self.con.set_authorizer]:
413            printer_instance = Printer()
414            method(printer_instance.log)
415            method(printer_instance.log)
416            self.con.execute("select 1")  # trigger seg fault
417            method(None)
418
419    def test_return_empty_bytestring(self):
420        cur = self.con.execute("select X''")
421        val = cur.fetchone()[0]
422        self.assertEqual(val, b'')
423
424    def test_table_lock_cursor_replace_stmt(self):
425        with memory_database() as con:
426            cur = con.cursor()
427            cur.execute("create table t(t)")
428            cur.executemany("insert into t values(?)",
429                            ((v,) for v in range(5)))
430            con.commit()
431            cur.execute("select t from t")
432            cur.execute("drop table t")
433            con.commit()
434
435    def test_table_lock_cursor_dealloc(self):
436        with memory_database() as con:
437            con.execute("create table t(t)")
438            con.executemany("insert into t values(?)",
439                            ((v,) for v in range(5)))
440            con.commit()
441            cur = con.execute("select t from t")
442            del cur
443            con.execute("drop table t")
444            con.commit()
445
446    def test_table_lock_cursor_non_readonly_select(self):
447        with memory_database() as con:
448            con.execute("create table t(t)")
449            con.executemany("insert into t values(?)",
450                            ((v,) for v in range(5)))
451            con.commit()
452            def dup(v):
453                con.execute("insert into t values(?)", (v,))
454                return
455            con.create_function("dup", 1, dup)
456            cur = con.execute("select dup(t) from t")
457            del cur
458            con.execute("drop table t")
459            con.commit()
460
461    def test_executescript_step_through_select(self):
462        with memory_database() as con:
463            values = [(v,) for v in range(5)]
464            with con:
465                con.execute("create table t(t)")
466                con.executemany("insert into t values(?)", values)
467            steps = []
468            con.create_function("step", 1, lambda x: steps.append((x,)))
469            con.executescript("select step(t) from t")
470            self.assertEqual(steps, values)
471
472    def test_custom_cursor_object_crash_gh_99886(self):
473        # This test segfaults on GH-99886
474        class MyCursor(sqlite.Cursor):
475            def __init__(self, *args, **kwargs):
476                super().__init__(*args, **kwargs)
477                # this can go before or after the super call; doesn't matter
478                self.some_attr = None
479
480        with memory_database() as con:
481            cur = con.cursor(MyCursor)
482            cur.close()
483            del cur
484
485class RecursiveUseOfCursors(unittest.TestCase):
486    # GH-80254: sqlite3 should not segfault for recursive use of cursors.
487    msg = "Recursive use of cursors not allowed"
488
489    def setUp(self):
490        self.con = sqlite.connect(":memory:",
491                                  detect_types=sqlite.PARSE_COLNAMES)
492        self.cur = self.con.cursor()
493        self.cur.execute("create table test(x foo)")
494        self.cur.executemany("insert into test(x) values (?)",
495                             [("foo",), ("bar",)])
496
497    def tearDown(self):
498        self.cur.close()
499        self.con.close()
500
501    def test_recursive_cursor_init(self):
502        conv = lambda x: self.cur.__init__(self.con)
503        with patch.dict(sqlite.converters, {"INIT": conv}):
504            self.cur.execute(f'select x as "x [INIT]", x from test')
505            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
506                                   self.cur.fetchall)
507
508    def test_recursive_cursor_close(self):
509        conv = lambda x: self.cur.close()
510        with patch.dict(sqlite.converters, {"CLOSE": conv}):
511            self.cur.execute(f'select x as "x [CLOSE]", x from test')
512            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
513                                   self.cur.fetchall)
514
515    def test_recursive_cursor_iter(self):
516        conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
517        with patch.dict(sqlite.converters, {"ITER": conv}):
518            self.cur.execute(f'select x as "x [ITER]", x from test')
519            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
520                                   self.cur.fetchall)
521
522
523if __name__ == "__main__":
524    unittest.main()
525