1# pysqlite2/test/transactions.py: tests transactions 2# 3# Copyright (C) 2005-2007 Gerhard Häring <[email protected]> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import os, unittest 24import sqlite3 as sqlite 25 26from test.support import LOOPBACK_TIMEOUT 27from test.support.os_helper import TESTFN, unlink 28 29from test.test_sqlite3.test_dbapi import memory_database 30 31 32TIMEOUT = LOOPBACK_TIMEOUT / 10 33 34 35class TransactionTests(unittest.TestCase): 36 def setUp(self): 37 self.con1 = sqlite.connect(TESTFN, timeout=TIMEOUT) 38 self.cur1 = self.con1.cursor() 39 40 self.con2 = sqlite.connect(TESTFN, timeout=TIMEOUT) 41 self.cur2 = self.con2.cursor() 42 43 def tearDown(self): 44 try: 45 self.cur1.close() 46 self.con1.close() 47 48 self.cur2.close() 49 self.con2.close() 50 51 finally: 52 unlink(TESTFN) 53 54 def test_dml_does_not_auto_commit_before(self): 55 self.cur1.execute("create table test(i)") 56 self.cur1.execute("insert into test(i) values (5)") 57 self.cur1.execute("create table test2(j)") 58 self.cur2.execute("select i from test") 59 res = self.cur2.fetchall() 60 self.assertEqual(len(res), 0) 61 62 def test_insert_starts_transaction(self): 63 self.cur1.execute("create table test(i)") 64 self.cur1.execute("insert into test(i) values (5)") 65 self.cur2.execute("select i from test") 66 res = self.cur2.fetchall() 67 self.assertEqual(len(res), 0) 68 69 def test_update_starts_transaction(self): 70 self.cur1.execute("create table test(i)") 71 self.cur1.execute("insert into test(i) values (5)") 72 self.con1.commit() 73 self.cur1.execute("update test set i=6") 74 self.cur2.execute("select i from test") 75 res = self.cur2.fetchone()[0] 76 self.assertEqual(res, 5) 77 78 def test_delete_starts_transaction(self): 79 self.cur1.execute("create table test(i)") 80 self.cur1.execute("insert into test(i) values (5)") 81 self.con1.commit() 82 self.cur1.execute("delete from test") 83 self.cur2.execute("select i from test") 84 res = self.cur2.fetchall() 85 self.assertEqual(len(res), 1) 86 87 def test_replace_starts_transaction(self): 88 self.cur1.execute("create table test(i)") 89 self.cur1.execute("insert into test(i) values (5)") 90 self.con1.commit() 91 self.cur1.execute("replace into test(i) values (6)") 92 self.cur2.execute("select i from test") 93 res = self.cur2.fetchall() 94 self.assertEqual(len(res), 1) 95 self.assertEqual(res[0][0], 5) 96 97 def test_toggle_auto_commit(self): 98 self.cur1.execute("create table test(i)") 99 self.cur1.execute("insert into test(i) values (5)") 100 self.con1.isolation_level = None 101 self.assertEqual(self.con1.isolation_level, None) 102 self.cur2.execute("select i from test") 103 res = self.cur2.fetchall() 104 self.assertEqual(len(res), 1) 105 106 self.con1.isolation_level = "DEFERRED" 107 self.assertEqual(self.con1.isolation_level , "DEFERRED") 108 self.cur1.execute("insert into test(i) values (5)") 109 self.cur2.execute("select i from test") 110 res = self.cur2.fetchall() 111 self.assertEqual(len(res), 1) 112 113 def test_raise_timeout(self): 114 self.cur1.execute("create table test(i)") 115 self.cur1.execute("insert into test(i) values (5)") 116 with self.assertRaises(sqlite.OperationalError): 117 self.cur2.execute("insert into test(i) values (5)") 118 119 def test_locking(self): 120 """ 121 This tests the improved concurrency with pysqlite 2.3.4. You needed 122 to roll back con2 before you could commit con1. 123 """ 124 self.cur1.execute("create table test(i)") 125 self.cur1.execute("insert into test(i) values (5)") 126 with self.assertRaises(sqlite.OperationalError): 127 self.cur2.execute("insert into test(i) values (5)") 128 # NO self.con2.rollback() HERE!!! 129 self.con1.commit() 130 131 def test_rollback_cursor_consistency(self): 132 """Check that cursors behave correctly after rollback.""" 133 con = sqlite.connect(":memory:") 134 cur = con.cursor() 135 cur.execute("create table test(x)") 136 cur.execute("insert into test(x) values (5)") 137 cur.execute("select 1 union select 2 union select 3") 138 139 con.rollback() 140 self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)]) 141 142 def test_multiple_cursors_and_iternext(self): 143 # gh-94028: statements are cleared and reset in cursor iternext. 144 145 # Provoke the gh-94028 by using a cursor cache. 146 CURSORS = {} 147 def sql(cx, sql, *args): 148 cu = cx.cursor() 149 cu.execute(sql, args) 150 CURSORS[id(sql)] = cu 151 return cu 152 153 self.con1.execute("create table t(t)") 154 sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3") 155 self.con1.commit() 156 157 # On second connection, verify rows are visible, then delete them. 158 count = sql(self.con2, "select count(*) from t").fetchone()[0] 159 self.assertEqual(count, 3) 160 changes = sql(self.con2, "delete from t").rowcount 161 self.assertEqual(changes, 3) 162 self.con2.commit() 163 164 # Back in original connection, create 2 new users. 165 sql(self.con1, "insert into t values (?)", "u4") 166 sql(self.con1, "insert into t values (?)", "u5") 167 168 # The second connection cannot see uncommitted changes. 169 count = sql(self.con2, "select count(*) from t").fetchone()[0] 170 self.assertEqual(count, 0) 171 172 # First connection can see its own changes. 173 count = sql(self.con1, "select count(*) from t").fetchone()[0] 174 self.assertEqual(count, 2) 175 176 # The second connection can now see the changes. 177 self.con1.commit() 178 count = sql(self.con2, "select count(*) from t").fetchone()[0] 179 self.assertEqual(count, 2) 180 181 182class RollbackTests(unittest.TestCase): 183 """bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues""" 184 185 def setUp(self): 186 self.con = sqlite.connect(":memory:") 187 self.cur1 = self.con.cursor() 188 self.cur2 = self.con.cursor() 189 with self.con: 190 self.con.execute("create table t(c)"); 191 self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)]) 192 self.cur1.execute("begin transaction") 193 select = "select c from t" 194 self.cur1.execute(select) 195 self.con.rollback() 196 self.res = self.cur2.execute(select) # Reusing stmt from cache 197 198 def tearDown(self): 199 self.con.close() 200 201 def _check_rows(self): 202 for i, row in enumerate(self.res): 203 self.assertEqual(row[0], i) 204 205 def test_no_duplicate_rows_after_rollback_del_cursor(self): 206 del self.cur1 207 self._check_rows() 208 209 def test_no_duplicate_rows_after_rollback_close_cursor(self): 210 self.cur1.close() 211 self._check_rows() 212 213 def test_no_duplicate_rows_after_rollback_new_query(self): 214 self.cur1.execute("select c from t where c = 1") 215 self._check_rows() 216 217 218 219class SpecialCommandTests(unittest.TestCase): 220 def setUp(self): 221 self.con = sqlite.connect(":memory:") 222 self.cur = self.con.cursor() 223 224 def test_drop_table(self): 225 self.cur.execute("create table test(i)") 226 self.cur.execute("insert into test(i) values (5)") 227 self.cur.execute("drop table test") 228 229 def test_pragma(self): 230 self.cur.execute("create table test(i)") 231 self.cur.execute("insert into test(i) values (5)") 232 self.cur.execute("pragma count_changes=1") 233 234 def tearDown(self): 235 self.cur.close() 236 self.con.close() 237 238 239class TransactionalDDL(unittest.TestCase): 240 def setUp(self): 241 self.con = sqlite.connect(":memory:") 242 243 def test_ddl_does_not_autostart_transaction(self): 244 # For backwards compatibility reasons, DDL statements should not 245 # implicitly start a transaction. 246 self.con.execute("create table test(i)") 247 self.con.rollback() 248 result = self.con.execute("select * from test").fetchall() 249 self.assertEqual(result, []) 250 251 def test_immediate_transactional_ddl(self): 252 # You can achieve transactional DDL by issuing a BEGIN 253 # statement manually. 254 self.con.execute("begin immediate") 255 self.con.execute("create table test(i)") 256 self.con.rollback() 257 with self.assertRaises(sqlite.OperationalError): 258 self.con.execute("select * from test") 259 260 def test_transactional_ddl(self): 261 # You can achieve transactional DDL by issuing a BEGIN 262 # statement manually. 263 self.con.execute("begin") 264 self.con.execute("create table test(i)") 265 self.con.rollback() 266 with self.assertRaises(sqlite.OperationalError): 267 self.con.execute("select * from test") 268 269 def tearDown(self): 270 self.con.close() 271 272 273class IsolationLevelFromInit(unittest.TestCase): 274 CREATE = "create table t(t)" 275 INSERT = "insert into t values(1)" 276 277 def setUp(self): 278 self.traced = [] 279 280 def _run_test(self, cx): 281 cx.execute(self.CREATE) 282 cx.set_trace_callback(lambda stmt: self.traced.append(stmt)) 283 with cx: 284 cx.execute(self.INSERT) 285 286 def test_isolation_level_default(self): 287 with memory_database() as cx: 288 self._run_test(cx) 289 self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"]) 290 291 def test_isolation_level_begin(self): 292 with memory_database(isolation_level="") as cx: 293 self._run_test(cx) 294 self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"]) 295 296 def test_isolation_level_deferred(self): 297 with memory_database(isolation_level="DEFERRED") as cx: 298 self._run_test(cx) 299 self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"]) 300 301 def test_isolation_level_immediate(self): 302 with memory_database(isolation_level="IMMEDIATE") as cx: 303 self._run_test(cx) 304 self.assertEqual(self.traced, 305 ["BEGIN IMMEDIATE", self.INSERT, "COMMIT"]) 306 307 def test_isolation_level_exclusive(self): 308 with memory_database(isolation_level="EXCLUSIVE") as cx: 309 self._run_test(cx) 310 self.assertEqual(self.traced, 311 ["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"]) 312 313 def test_isolation_level_none(self): 314 with memory_database(isolation_level=None) as cx: 315 self._run_test(cx) 316 self.assertEqual(self.traced, [self.INSERT]) 317 318 319class IsolationLevelPostInit(unittest.TestCase): 320 QUERY = "insert into t values(1)" 321 322 def setUp(self): 323 self.cx = sqlite.connect(":memory:") 324 self.cx.execute("create table t(t)") 325 self.traced = [] 326 self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt)) 327 328 def tearDown(self): 329 self.cx.close() 330 331 def test_isolation_level_default(self): 332 with self.cx: 333 self.cx.execute(self.QUERY) 334 self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"]) 335 336 def test_isolation_level_begin(self): 337 self.cx.isolation_level = "" 338 with self.cx: 339 self.cx.execute(self.QUERY) 340 self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"]) 341 342 def test_isolation_level_deferrred(self): 343 self.cx.isolation_level = "DEFERRED" 344 with self.cx: 345 self.cx.execute(self.QUERY) 346 self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"]) 347 348 def test_isolation_level_immediate(self): 349 self.cx.isolation_level = "IMMEDIATE" 350 with self.cx: 351 self.cx.execute(self.QUERY) 352 self.assertEqual(self.traced, 353 ["BEGIN IMMEDIATE", self.QUERY, "COMMIT"]) 354 355 def test_isolation_level_exclusive(self): 356 self.cx.isolation_level = "EXCLUSIVE" 357 with self.cx: 358 self.cx.execute(self.QUERY) 359 self.assertEqual(self.traced, 360 ["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"]) 361 362 def test_isolation_level_none(self): 363 self.cx.isolation_level = None 364 with self.cx: 365 self.cx.execute(self.QUERY) 366 self.assertEqual(self.traced, [self.QUERY]) 367 368 369if __name__ == "__main__": 370 unittest.main() 371