1# pysqlite2/test/types.py: tests for type conversion and detection 2# 3# Copyright (C) 2005 Gerhard Häring <[email protected]> 4# 5# This file is part of pysqlite. 6# 7# This software is provided 'as-is', without any express or implied 8# warranty. In no event will the authors be held liable for any damages 9# arising from the use of this software. 10# 11# Permission is granted to anyone to use this software for any purpose, 12# including commercial applications, and to alter it and redistribute it 13# freely, subject to the following restrictions: 14# 15# 1. The origin of this software must not be misrepresented; you must not 16# claim that you wrote the original software. If you use this software 17# in a product, an acknowledgment in the product documentation would be 18# appreciated but is not required. 19# 2. Altered source versions must be plainly marked as such, and must not be 20# misrepresented as being the original software. 21# 3. This notice may not be removed or altered from any source distribution. 22 23import datetime 24import unittest 25import sqlite3 as sqlite 26import sys 27try: 28 import zlib 29except ImportError: 30 zlib = None 31 32from test import support 33 34 35class SqliteTypeTests(unittest.TestCase): 36 def setUp(self): 37 self.con = sqlite.connect(":memory:") 38 self.cur = self.con.cursor() 39 self.cur.execute("create table test(i integer, s varchar, f number, b blob)") 40 41 def tearDown(self): 42 self.cur.close() 43 self.con.close() 44 45 def test_string(self): 46 self.cur.execute("insert into test(s) values (?)", ("Österreich",)) 47 self.cur.execute("select s from test") 48 row = self.cur.fetchone() 49 self.assertEqual(row[0], "Österreich") 50 51 def test_string_with_null_character(self): 52 self.cur.execute("insert into test(s) values (?)", ("a\0b",)) 53 self.cur.execute("select s from test") 54 row = self.cur.fetchone() 55 self.assertEqual(row[0], "a\0b") 56 57 def test_small_int(self): 58 self.cur.execute("insert into test(i) values (?)", (42,)) 59 self.cur.execute("select i from test") 60 row = self.cur.fetchone() 61 self.assertEqual(row[0], 42) 62 63 def test_large_int(self): 64 num = 123456789123456789 65 self.cur.execute("insert into test(i) values (?)", (num,)) 66 self.cur.execute("select i from test") 67 row = self.cur.fetchone() 68 self.assertEqual(row[0], num) 69 70 def test_float(self): 71 val = 3.14 72 self.cur.execute("insert into test(f) values (?)", (val,)) 73 self.cur.execute("select f from test") 74 row = self.cur.fetchone() 75 self.assertEqual(row[0], val) 76 77 def test_blob(self): 78 sample = b"Guglhupf" 79 val = memoryview(sample) 80 self.cur.execute("insert into test(b) values (?)", (val,)) 81 self.cur.execute("select b from test") 82 row = self.cur.fetchone() 83 self.assertEqual(row[0], sample) 84 85 def test_unicode_execute(self): 86 self.cur.execute("select 'Österreich'") 87 row = self.cur.fetchone() 88 self.assertEqual(row[0], "Österreich") 89 90 def test_too_large_int(self): 91 for value in 2**63, -2**63-1, 2**64: 92 with self.assertRaises(OverflowError): 93 self.cur.execute("insert into test(i) values (?)", (value,)) 94 self.cur.execute("select i from test") 95 row = self.cur.fetchone() 96 self.assertIsNone(row) 97 98 def test_string_with_surrogates(self): 99 for value in 0xd8ff, 0xdcff: 100 with self.assertRaises(UnicodeEncodeError): 101 self.cur.execute("insert into test(s) values (?)", (chr(value),)) 102 self.cur.execute("select s from test") 103 row = self.cur.fetchone() 104 self.assertIsNone(row) 105 106 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 107 @support.bigmemtest(size=2**31, memuse=4, dry_run=False) 108 def test_too_large_string(self, maxsize): 109 with self.assertRaises(sqlite.DataError): 110 self.cur.execute("insert into test(s) values (?)", ('x'*(2**31-1),)) 111 with self.assertRaises(sqlite.DataError): 112 self.cur.execute("insert into test(s) values (?)", ('x'*(2**31),)) 113 self.cur.execute("select 1 from test") 114 row = self.cur.fetchone() 115 self.assertIsNone(row) 116 117 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 118 @support.bigmemtest(size=2**31, memuse=3, dry_run=False) 119 def test_too_large_blob(self, maxsize): 120 with self.assertRaises(sqlite.DataError): 121 self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31-1),)) 122 with self.assertRaises(sqlite.DataError): 123 self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31),)) 124 self.cur.execute("select 1 from test") 125 row = self.cur.fetchone() 126 self.assertIsNone(row) 127 128 129class DeclTypesTests(unittest.TestCase): 130 class Foo: 131 def __init__(self, _val): 132 if isinstance(_val, bytes): 133 # sqlite3 always calls __init__ with a bytes created from a 134 # UTF-8 string when __conform__ was used to store the object. 135 _val = _val.decode('utf-8') 136 self.val = _val 137 138 def __eq__(self, other): 139 if not isinstance(other, DeclTypesTests.Foo): 140 return NotImplemented 141 return self.val == other.val 142 143 def __conform__(self, protocol): 144 if protocol is sqlite.PrepareProtocol: 145 return self.val 146 else: 147 return None 148 149 def __str__(self): 150 return "<%s>" % self.val 151 152 class BadConform: 153 def __init__(self, exc): 154 self.exc = exc 155 def __conform__(self, protocol): 156 raise self.exc 157 158 def setUp(self): 159 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 160 self.cur = self.con.cursor() 161 self.cur.execute(""" 162 create table test( 163 i int, 164 s str, 165 f float, 166 b bool, 167 u unicode, 168 foo foo, 169 bin blob, 170 n1 number, 171 n2 number(5), 172 bad bad, 173 cbin cblob) 174 """) 175 176 # override float, make them always return the same number 177 sqlite.converters["FLOAT"] = lambda x: 47.2 178 179 # and implement two custom ones 180 sqlite.converters["BOOL"] = lambda x: bool(int(x)) 181 sqlite.converters["FOO"] = DeclTypesTests.Foo 182 sqlite.converters["BAD"] = DeclTypesTests.BadConform 183 sqlite.converters["WRONG"] = lambda x: "WRONG" 184 sqlite.converters["NUMBER"] = float 185 sqlite.converters["CBLOB"] = lambda x: b"blobish" 186 187 def tearDown(self): 188 del sqlite.converters["FLOAT"] 189 del sqlite.converters["BOOL"] 190 del sqlite.converters["FOO"] 191 del sqlite.converters["BAD"] 192 del sqlite.converters["WRONG"] 193 del sqlite.converters["NUMBER"] 194 del sqlite.converters["CBLOB"] 195 self.cur.close() 196 self.con.close() 197 198 def test_string(self): 199 # default 200 self.cur.execute("insert into test(s) values (?)", ("foo",)) 201 self.cur.execute('select s as "s [WRONG]" from test') 202 row = self.cur.fetchone() 203 self.assertEqual(row[0], "foo") 204 205 def test_small_int(self): 206 # default 207 self.cur.execute("insert into test(i) values (?)", (42,)) 208 self.cur.execute("select i from test") 209 row = self.cur.fetchone() 210 self.assertEqual(row[0], 42) 211 212 def test_large_int(self): 213 # default 214 num = 123456789123456789 215 self.cur.execute("insert into test(i) values (?)", (num,)) 216 self.cur.execute("select i from test") 217 row = self.cur.fetchone() 218 self.assertEqual(row[0], num) 219 220 def test_float(self): 221 # custom 222 val = 3.14 223 self.cur.execute("insert into test(f) values (?)", (val,)) 224 self.cur.execute("select f from test") 225 row = self.cur.fetchone() 226 self.assertEqual(row[0], 47.2) 227 228 def test_bool(self): 229 # custom 230 self.cur.execute("insert into test(b) values (?)", (False,)) 231 self.cur.execute("select b from test") 232 row = self.cur.fetchone() 233 self.assertIs(row[0], False) 234 235 self.cur.execute("delete from test") 236 self.cur.execute("insert into test(b) values (?)", (True,)) 237 self.cur.execute("select b from test") 238 row = self.cur.fetchone() 239 self.assertIs(row[0], True) 240 241 def test_unicode(self): 242 # default 243 val = "\xd6sterreich" 244 self.cur.execute("insert into test(u) values (?)", (val,)) 245 self.cur.execute("select u from test") 246 row = self.cur.fetchone() 247 self.assertEqual(row[0], val) 248 249 def test_foo(self): 250 val = DeclTypesTests.Foo("bla") 251 self.cur.execute("insert into test(foo) values (?)", (val,)) 252 self.cur.execute("select foo from test") 253 row = self.cur.fetchone() 254 self.assertEqual(row[0], val) 255 256 def test_error_in_conform(self): 257 val = DeclTypesTests.BadConform(TypeError) 258 with self.assertRaises(sqlite.ProgrammingError): 259 self.cur.execute("insert into test(bad) values (?)", (val,)) 260 with self.assertRaises(sqlite.ProgrammingError): 261 self.cur.execute("insert into test(bad) values (:val)", {"val": val}) 262 263 val = DeclTypesTests.BadConform(KeyboardInterrupt) 264 with self.assertRaises(KeyboardInterrupt): 265 self.cur.execute("insert into test(bad) values (?)", (val,)) 266 with self.assertRaises(KeyboardInterrupt): 267 self.cur.execute("insert into test(bad) values (:val)", {"val": val}) 268 269 def test_unsupported_seq(self): 270 class Bar: pass 271 val = Bar() 272 with self.assertRaises(sqlite.ProgrammingError): 273 self.cur.execute("insert into test(f) values (?)", (val,)) 274 275 def test_unsupported_dict(self): 276 class Bar: pass 277 val = Bar() 278 with self.assertRaises(sqlite.ProgrammingError): 279 self.cur.execute("insert into test(f) values (:val)", {"val": val}) 280 281 def test_blob(self): 282 # default 283 sample = b"Guglhupf" 284 val = memoryview(sample) 285 self.cur.execute("insert into test(bin) values (?)", (val,)) 286 self.cur.execute("select bin from test") 287 row = self.cur.fetchone() 288 self.assertEqual(row[0], sample) 289 290 def test_number1(self): 291 self.cur.execute("insert into test(n1) values (5)") 292 value = self.cur.execute("select n1 from test").fetchone()[0] 293 # if the converter is not used, it's an int instead of a float 294 self.assertEqual(type(value), float) 295 296 def test_number2(self): 297 """Checks whether converter names are cut off at '(' characters""" 298 self.cur.execute("insert into test(n2) values (5)") 299 value = self.cur.execute("select n2 from test").fetchone()[0] 300 # if the converter is not used, it's an int instead of a float 301 self.assertEqual(type(value), float) 302 303 def test_convert_zero_sized_blob(self): 304 self.con.execute("insert into test(cbin) values (?)", (b"",)) 305 cur = self.con.execute("select cbin from test") 306 # Zero-sized blobs with converters returns None. This differs from 307 # blobs without a converter, where b"" is returned. 308 self.assertIsNone(cur.fetchone()[0]) 309 310 311class ColNamesTests(unittest.TestCase): 312 def setUp(self): 313 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 314 self.cur = self.con.cursor() 315 self.cur.execute("create table test(x foo)") 316 317 sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii") 318 sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii") 319 sqlite.converters["EXC"] = lambda x: 5/0 320 sqlite.converters["B1B1"] = lambda x: "MARKER" 321 322 def tearDown(self): 323 del sqlite.converters["FOO"] 324 del sqlite.converters["BAR"] 325 del sqlite.converters["EXC"] 326 del sqlite.converters["B1B1"] 327 self.cur.close() 328 self.con.close() 329 330 def test_decl_type_not_used(self): 331 """ 332 Assures that the declared type is not used when PARSE_DECLTYPES 333 is not set. 334 """ 335 self.cur.execute("insert into test(x) values (?)", ("xxx",)) 336 self.cur.execute("select x from test") 337 val = self.cur.fetchone()[0] 338 self.assertEqual(val, "xxx") 339 340 def test_none(self): 341 self.cur.execute("insert into test(x) values (?)", (None,)) 342 self.cur.execute("select x from test") 343 val = self.cur.fetchone()[0] 344 self.assertEqual(val, None) 345 346 def test_col_name(self): 347 self.cur.execute("insert into test(x) values (?)", ("xxx",)) 348 self.cur.execute('select x as "x y [bar]" from test') 349 val = self.cur.fetchone()[0] 350 self.assertEqual(val, "<xxx>") 351 352 # Check if the stripping of colnames works. Everything after the first 353 # '[' (and the preceding space) should be stripped. 354 self.assertEqual(self.cur.description[0][0], "x y") 355 356 def test_case_in_converter_name(self): 357 self.cur.execute("select 'other' as \"x [b1b1]\"") 358 val = self.cur.fetchone()[0] 359 self.assertEqual(val, "MARKER") 360 361 def test_cursor_description_no_row(self): 362 """ 363 cursor.description should at least provide the column name(s), even if 364 no row returned. 365 """ 366 self.cur.execute("select * from test where 0 = 1") 367 self.assertEqual(self.cur.description[0][0], "x") 368 369 def test_cursor_description_insert(self): 370 self.cur.execute("insert into test values (1)") 371 self.assertIsNone(self.cur.description) 372 373 374@unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "CTEs not supported") 375class CommonTableExpressionTests(unittest.TestCase): 376 377 def setUp(self): 378 self.con = sqlite.connect(":memory:") 379 self.cur = self.con.cursor() 380 self.cur.execute("create table test(x foo)") 381 382 def tearDown(self): 383 self.cur.close() 384 self.con.close() 385 386 def test_cursor_description_cte_simple(self): 387 self.cur.execute("with one as (select 1) select * from one") 388 self.assertIsNotNone(self.cur.description) 389 self.assertEqual(self.cur.description[0][0], "1") 390 391 def test_cursor_description_cte_multiple_columns(self): 392 self.cur.execute("insert into test values(1)") 393 self.cur.execute("insert into test values(2)") 394 self.cur.execute("with testCTE as (select * from test) select * from testCTE") 395 self.assertIsNotNone(self.cur.description) 396 self.assertEqual(self.cur.description[0][0], "x") 397 398 def test_cursor_description_cte(self): 399 self.cur.execute("insert into test values (1)") 400 self.cur.execute("with bar as (select * from test) select * from test where x = 1") 401 self.assertIsNotNone(self.cur.description) 402 self.assertEqual(self.cur.description[0][0], "x") 403 self.cur.execute("with bar as (select * from test) select * from test where x = 2") 404 self.assertIsNotNone(self.cur.description) 405 self.assertEqual(self.cur.description[0][0], "x") 406 407 408class ObjectAdaptationTests(unittest.TestCase): 409 def cast(obj): 410 return float(obj) 411 cast = staticmethod(cast) 412 413 def setUp(self): 414 self.con = sqlite.connect(":memory:") 415 try: 416 del sqlite.adapters[int] 417 except: 418 pass 419 sqlite.register_adapter(int, ObjectAdaptationTests.cast) 420 self.cur = self.con.cursor() 421 422 def tearDown(self): 423 del sqlite.adapters[(int, sqlite.PrepareProtocol)] 424 self.cur.close() 425 self.con.close() 426 427 def test_caster_is_used(self): 428 self.cur.execute("select ?", (4,)) 429 val = self.cur.fetchone()[0] 430 self.assertEqual(type(val), float) 431 432 def test_missing_adapter(self): 433 with self.assertRaises(sqlite.ProgrammingError): 434 sqlite.adapt(1.) # No float adapter registered 435 436 def test_missing_protocol(self): 437 with self.assertRaises(sqlite.ProgrammingError): 438 sqlite.adapt(1, None) 439 440 def test_defect_proto(self): 441 class DefectProto(): 442 def __adapt__(self): 443 return None 444 with self.assertRaises(sqlite.ProgrammingError): 445 sqlite.adapt(1., DefectProto) 446 447 def test_defect_self_adapt(self): 448 class DefectSelfAdapt(float): 449 def __conform__(self, _): 450 return None 451 with self.assertRaises(sqlite.ProgrammingError): 452 sqlite.adapt(DefectSelfAdapt(1.)) 453 454 def test_custom_proto(self): 455 class CustomProto(): 456 def __adapt__(self): 457 return "adapted" 458 self.assertEqual(sqlite.adapt(1., CustomProto), "adapted") 459 460 def test_adapt(self): 461 val = 42 462 self.assertEqual(float(val), sqlite.adapt(val)) 463 464 def test_adapt_alt(self): 465 alt = "other" 466 self.assertEqual(alt, sqlite.adapt(1., None, alt)) 467 468 469@unittest.skipUnless(zlib, "requires zlib") 470class BinaryConverterTests(unittest.TestCase): 471 def convert(s): 472 return zlib.decompress(s) 473 convert = staticmethod(convert) 474 475 def setUp(self): 476 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 477 sqlite.register_converter("bin", BinaryConverterTests.convert) 478 479 def tearDown(self): 480 self.con.close() 481 482 def test_binary_input_for_converter(self): 483 testdata = b"abcdefg" * 10 484 result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0] 485 self.assertEqual(testdata, result) 486 487class DateTimeTests(unittest.TestCase): 488 def setUp(self): 489 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 490 self.cur = self.con.cursor() 491 self.cur.execute("create table test(d date, ts timestamp)") 492 493 def tearDown(self): 494 self.cur.close() 495 self.con.close() 496 497 def test_sqlite_date(self): 498 d = sqlite.Date(2004, 2, 14) 499 self.cur.execute("insert into test(d) values (?)", (d,)) 500 self.cur.execute("select d from test") 501 d2 = self.cur.fetchone()[0] 502 self.assertEqual(d, d2) 503 504 def test_sqlite_timestamp(self): 505 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0) 506 self.cur.execute("insert into test(ts) values (?)", (ts,)) 507 self.cur.execute("select ts from test") 508 ts2 = self.cur.fetchone()[0] 509 self.assertEqual(ts, ts2) 510 511 def test_sql_timestamp(self): 512 now = datetime.datetime.utcnow() 513 self.cur.execute("insert into test(ts) values (current_timestamp)") 514 self.cur.execute("select ts from test") 515 ts = self.cur.fetchone()[0] 516 self.assertEqual(type(ts), datetime.datetime) 517 self.assertEqual(ts.year, now.year) 518 519 def test_date_time_sub_seconds(self): 520 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000) 521 self.cur.execute("insert into test(ts) values (?)", (ts,)) 522 self.cur.execute("select ts from test") 523 ts2 = self.cur.fetchone()[0] 524 self.assertEqual(ts, ts2) 525 526 def test_date_time_sub_seconds_floating_point(self): 527 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241) 528 self.cur.execute("insert into test(ts) values (?)", (ts,)) 529 self.cur.execute("select ts from test") 530 ts2 = self.cur.fetchone()[0] 531 self.assertEqual(ts, ts2) 532 533 534if __name__ == "__main__": 535 unittest.main() 536