1# pysqlite2/test/transactions.py: tests transactions
2#
3# Copyright (C) 2005-2007 Gerhard Häring <[email protected]>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import os, unittest
24import sqlite3 as sqlite
25
26from test.support import LOOPBACK_TIMEOUT
27from test.support.os_helper import TESTFN, unlink
28
29from test.test_sqlite3.test_dbapi import memory_database
30
31
32TIMEOUT = LOOPBACK_TIMEOUT / 10
33
34
35class TransactionTests(unittest.TestCase):
36    def setUp(self):
37        self.con1 = sqlite.connect(TESTFN, timeout=TIMEOUT)
38        self.cur1 = self.con1.cursor()
39
40        self.con2 = sqlite.connect(TESTFN, timeout=TIMEOUT)
41        self.cur2 = self.con2.cursor()
42
43    def tearDown(self):
44        try:
45            self.cur1.close()
46            self.con1.close()
47
48            self.cur2.close()
49            self.con2.close()
50
51        finally:
52            unlink(TESTFN)
53
54    def test_dml_does_not_auto_commit_before(self):
55        self.cur1.execute("create table test(i)")
56        self.cur1.execute("insert into test(i) values (5)")
57        self.cur1.execute("create table test2(j)")
58        self.cur2.execute("select i from test")
59        res = self.cur2.fetchall()
60        self.assertEqual(len(res), 0)
61
62    def test_insert_starts_transaction(self):
63        self.cur1.execute("create table test(i)")
64        self.cur1.execute("insert into test(i) values (5)")
65        self.cur2.execute("select i from test")
66        res = self.cur2.fetchall()
67        self.assertEqual(len(res), 0)
68
69    def test_update_starts_transaction(self):
70        self.cur1.execute("create table test(i)")
71        self.cur1.execute("insert into test(i) values (5)")
72        self.con1.commit()
73        self.cur1.execute("update test set i=6")
74        self.cur2.execute("select i from test")
75        res = self.cur2.fetchone()[0]
76        self.assertEqual(res, 5)
77
78    def test_delete_starts_transaction(self):
79        self.cur1.execute("create table test(i)")
80        self.cur1.execute("insert into test(i) values (5)")
81        self.con1.commit()
82        self.cur1.execute("delete from test")
83        self.cur2.execute("select i from test")
84        res = self.cur2.fetchall()
85        self.assertEqual(len(res), 1)
86
87    def test_replace_starts_transaction(self):
88        self.cur1.execute("create table test(i)")
89        self.cur1.execute("insert into test(i) values (5)")
90        self.con1.commit()
91        self.cur1.execute("replace into test(i) values (6)")
92        self.cur2.execute("select i from test")
93        res = self.cur2.fetchall()
94        self.assertEqual(len(res), 1)
95        self.assertEqual(res[0][0], 5)
96
97    def test_toggle_auto_commit(self):
98        self.cur1.execute("create table test(i)")
99        self.cur1.execute("insert into test(i) values (5)")
100        self.con1.isolation_level = None
101        self.assertEqual(self.con1.isolation_level, None)
102        self.cur2.execute("select i from test")
103        res = self.cur2.fetchall()
104        self.assertEqual(len(res), 1)
105
106        self.con1.isolation_level = "DEFERRED"
107        self.assertEqual(self.con1.isolation_level , "DEFERRED")
108        self.cur1.execute("insert into test(i) values (5)")
109        self.cur2.execute("select i from test")
110        res = self.cur2.fetchall()
111        self.assertEqual(len(res), 1)
112
113    def test_raise_timeout(self):
114        self.cur1.execute("create table test(i)")
115        self.cur1.execute("insert into test(i) values (5)")
116        with self.assertRaises(sqlite.OperationalError):
117            self.cur2.execute("insert into test(i) values (5)")
118
119    def test_locking(self):
120        """
121        This tests the improved concurrency with pysqlite 2.3.4. You needed
122        to roll back con2 before you could commit con1.
123        """
124        self.cur1.execute("create table test(i)")
125        self.cur1.execute("insert into test(i) values (5)")
126        with self.assertRaises(sqlite.OperationalError):
127            self.cur2.execute("insert into test(i) values (5)")
128        # NO self.con2.rollback() HERE!!!
129        self.con1.commit()
130
131    def test_rollback_cursor_consistency(self):
132        """Check that cursors behave correctly after rollback."""
133        con = sqlite.connect(":memory:")
134        cur = con.cursor()
135        cur.execute("create table test(x)")
136        cur.execute("insert into test(x) values (5)")
137        cur.execute("select 1 union select 2 union select 3")
138
139        con.rollback()
140        self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)])
141
142    def test_multiple_cursors_and_iternext(self):
143        # gh-94028: statements are cleared and reset in cursor iternext.
144
145        # Provoke the gh-94028 by using a cursor cache.
146        CURSORS = {}
147        def sql(cx, sql, *args):
148            cu = cx.cursor()
149            cu.execute(sql, args)
150            CURSORS[id(sql)] = cu
151            return cu
152
153        self.con1.execute("create table t(t)")
154        sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3")
155        self.con1.commit()
156
157        # On second connection, verify rows are visible, then delete them.
158        count = sql(self.con2, "select count(*) from t").fetchone()[0]
159        self.assertEqual(count, 3)
160        changes = sql(self.con2, "delete from t").rowcount
161        self.assertEqual(changes, 3)
162        self.con2.commit()
163
164        # Back in original connection, create 2 new users.
165        sql(self.con1, "insert into t values (?)", "u4")
166        sql(self.con1, "insert into t values (?)", "u5")
167
168        # The second connection cannot see uncommitted changes.
169        count = sql(self.con2, "select count(*) from t").fetchone()[0]
170        self.assertEqual(count, 0)
171
172        # First connection can see its own changes.
173        count = sql(self.con1, "select count(*) from t").fetchone()[0]
174        self.assertEqual(count, 2)
175
176        # The second connection can now see the changes.
177        self.con1.commit()
178        count = sql(self.con2, "select count(*) from t").fetchone()[0]
179        self.assertEqual(count, 2)
180
181
182class RollbackTests(unittest.TestCase):
183    """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues"""
184
185    def setUp(self):
186        self.con = sqlite.connect(":memory:")
187        self.cur1 = self.con.cursor()
188        self.cur2 = self.con.cursor()
189        with self.con:
190            self.con.execute("create table t(c)");
191            self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)])
192        self.cur1.execute("begin transaction")
193        select = "select c from t"
194        self.cur1.execute(select)
195        self.con.rollback()
196        self.res = self.cur2.execute(select)  # Reusing stmt from cache
197
198    def tearDown(self):
199        self.con.close()
200
201    def _check_rows(self):
202        for i, row in enumerate(self.res):
203            self.assertEqual(row[0], i)
204
205    def test_no_duplicate_rows_after_rollback_del_cursor(self):
206        del self.cur1
207        self._check_rows()
208
209    def test_no_duplicate_rows_after_rollback_close_cursor(self):
210        self.cur1.close()
211        self._check_rows()
212
213    def test_no_duplicate_rows_after_rollback_new_query(self):
214        self.cur1.execute("select c from t where c = 1")
215        self._check_rows()
216
217
218
219class SpecialCommandTests(unittest.TestCase):
220    def setUp(self):
221        self.con = sqlite.connect(":memory:")
222        self.cur = self.con.cursor()
223
224    def test_drop_table(self):
225        self.cur.execute("create table test(i)")
226        self.cur.execute("insert into test(i) values (5)")
227        self.cur.execute("drop table test")
228
229    def test_pragma(self):
230        self.cur.execute("create table test(i)")
231        self.cur.execute("insert into test(i) values (5)")
232        self.cur.execute("pragma count_changes=1")
233
234    def tearDown(self):
235        self.cur.close()
236        self.con.close()
237
238
239class TransactionalDDL(unittest.TestCase):
240    def setUp(self):
241        self.con = sqlite.connect(":memory:")
242
243    def test_ddl_does_not_autostart_transaction(self):
244        # For backwards compatibility reasons, DDL statements should not
245        # implicitly start a transaction.
246        self.con.execute("create table test(i)")
247        self.con.rollback()
248        result = self.con.execute("select * from test").fetchall()
249        self.assertEqual(result, [])
250
251    def test_immediate_transactional_ddl(self):
252        # You can achieve transactional DDL by issuing a BEGIN
253        # statement manually.
254        self.con.execute("begin immediate")
255        self.con.execute("create table test(i)")
256        self.con.rollback()
257        with self.assertRaises(sqlite.OperationalError):
258            self.con.execute("select * from test")
259
260    def test_transactional_ddl(self):
261        # You can achieve transactional DDL by issuing a BEGIN
262        # statement manually.
263        self.con.execute("begin")
264        self.con.execute("create table test(i)")
265        self.con.rollback()
266        with self.assertRaises(sqlite.OperationalError):
267            self.con.execute("select * from test")
268
269    def tearDown(self):
270        self.con.close()
271
272
273class IsolationLevelFromInit(unittest.TestCase):
274    CREATE = "create table t(t)"
275    INSERT = "insert into t values(1)"
276
277    def setUp(self):
278        self.traced = []
279
280    def _run_test(self, cx):
281        cx.execute(self.CREATE)
282        cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
283        with cx:
284            cx.execute(self.INSERT)
285
286    def test_isolation_level_default(self):
287        with memory_database() as cx:
288            self._run_test(cx)
289            self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
290
291    def test_isolation_level_begin(self):
292        with memory_database(isolation_level="") as cx:
293            self._run_test(cx)
294            self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
295
296    def test_isolation_level_deferred(self):
297        with memory_database(isolation_level="DEFERRED") as cx:
298            self._run_test(cx)
299            self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"])
300
301    def test_isolation_level_immediate(self):
302        with memory_database(isolation_level="IMMEDIATE") as cx:
303            self._run_test(cx)
304            self.assertEqual(self.traced,
305                             ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"])
306
307    def test_isolation_level_exclusive(self):
308        with memory_database(isolation_level="EXCLUSIVE") as cx:
309            self._run_test(cx)
310            self.assertEqual(self.traced,
311                             ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"])
312
313    def test_isolation_level_none(self):
314        with memory_database(isolation_level=None) as cx:
315            self._run_test(cx)
316            self.assertEqual(self.traced, [self.INSERT])
317
318
319class IsolationLevelPostInit(unittest.TestCase):
320    QUERY = "insert into t values(1)"
321
322    def setUp(self):
323        self.cx = sqlite.connect(":memory:")
324        self.cx.execute("create table t(t)")
325        self.traced = []
326        self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
327
328    def tearDown(self):
329        self.cx.close()
330
331    def test_isolation_level_default(self):
332        with self.cx:
333            self.cx.execute(self.QUERY)
334        self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
335
336    def test_isolation_level_begin(self):
337        self.cx.isolation_level = ""
338        with self.cx:
339            self.cx.execute(self.QUERY)
340        self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
341
342    def test_isolation_level_deferrred(self):
343        self.cx.isolation_level = "DEFERRED"
344        with self.cx:
345            self.cx.execute(self.QUERY)
346        self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"])
347
348    def test_isolation_level_immediate(self):
349        self.cx.isolation_level = "IMMEDIATE"
350        with self.cx:
351            self.cx.execute(self.QUERY)
352        self.assertEqual(self.traced,
353                         ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"])
354
355    def test_isolation_level_exclusive(self):
356        self.cx.isolation_level = "EXCLUSIVE"
357        with self.cx:
358            self.cx.execute(self.QUERY)
359        self.assertEqual(self.traced,
360                         ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"])
361
362    def test_isolation_level_none(self):
363        self.cx.isolation_level = None
364        with self.cx:
365            self.cx.execute(self.QUERY)
366        self.assertEqual(self.traced, [self.QUERY])
367
368
369if __name__ == "__main__":
370    unittest.main()
371