1# pysqlite2/test/types.py: tests for type conversion and detection
2#
3# Copyright (C) 2005 Gerhard Häring <[email protected]>
4#
5# This file is part of pysqlite.
6#
7# This software is provided 'as-is', without any express or implied
8# warranty.  In no event will the authors be held liable for any damages
9# arising from the use of this software.
10#
11# Permission is granted to anyone to use this software for any purpose,
12# including commercial applications, and to alter it and redistribute it
13# freely, subject to the following restrictions:
14#
15# 1. The origin of this software must not be misrepresented; you must not
16#    claim that you wrote the original software. If you use this software
17#    in a product, an acknowledgment in the product documentation would be
18#    appreciated but is not required.
19# 2. Altered source versions must be plainly marked as such, and must not be
20#    misrepresented as being the original software.
21# 3. This notice may not be removed or altered from any source distribution.
22
23import datetime
24import unittest
25import sqlite3 as sqlite
26import sys
27try:
28    import zlib
29except ImportError:
30    zlib = None
31
32from test import support
33
34
35class SqliteTypeTests(unittest.TestCase):
36    def setUp(self):
37        self.con = sqlite.connect(":memory:")
38        self.cur = self.con.cursor()
39        self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
40
41    def tearDown(self):
42        self.cur.close()
43        self.con.close()
44
45    def test_string(self):
46        self.cur.execute("insert into test(s) values (?)", ("Österreich",))
47        self.cur.execute("select s from test")
48        row = self.cur.fetchone()
49        self.assertEqual(row[0], "Österreich")
50
51    def test_string_with_null_character(self):
52        self.cur.execute("insert into test(s) values (?)", ("a\0b",))
53        self.cur.execute("select s from test")
54        row = self.cur.fetchone()
55        self.assertEqual(row[0], "a\0b")
56
57    def test_small_int(self):
58        self.cur.execute("insert into test(i) values (?)", (42,))
59        self.cur.execute("select i from test")
60        row = self.cur.fetchone()
61        self.assertEqual(row[0], 42)
62
63    def test_large_int(self):
64        num = 123456789123456789
65        self.cur.execute("insert into test(i) values (?)", (num,))
66        self.cur.execute("select i from test")
67        row = self.cur.fetchone()
68        self.assertEqual(row[0], num)
69
70    def test_float(self):
71        val = 3.14
72        self.cur.execute("insert into test(f) values (?)", (val,))
73        self.cur.execute("select f from test")
74        row = self.cur.fetchone()
75        self.assertEqual(row[0], val)
76
77    def test_blob(self):
78        sample = b"Guglhupf"
79        val = memoryview(sample)
80        self.cur.execute("insert into test(b) values (?)", (val,))
81        self.cur.execute("select b from test")
82        row = self.cur.fetchone()
83        self.assertEqual(row[0], sample)
84
85    def test_unicode_execute(self):
86        self.cur.execute("select 'Österreich'")
87        row = self.cur.fetchone()
88        self.assertEqual(row[0], "Österreich")
89
90    def test_too_large_int(self):
91        for value in 2**63, -2**63-1, 2**64:
92            with self.assertRaises(OverflowError):
93                self.cur.execute("insert into test(i) values (?)", (value,))
94        self.cur.execute("select i from test")
95        row = self.cur.fetchone()
96        self.assertIsNone(row)
97
98    def test_string_with_surrogates(self):
99        for value in 0xd8ff, 0xdcff:
100            with self.assertRaises(UnicodeEncodeError):
101                self.cur.execute("insert into test(s) values (?)", (chr(value),))
102        self.cur.execute("select s from test")
103        row = self.cur.fetchone()
104        self.assertIsNone(row)
105
106    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
107    @support.bigmemtest(size=2**31, memuse=4, dry_run=False)
108    def test_too_large_string(self, maxsize):
109        with self.assertRaises(sqlite.DataError):
110            self.cur.execute("insert into test(s) values (?)", ('x'*(2**31-1),))
111        with self.assertRaises(sqlite.DataError):
112            self.cur.execute("insert into test(s) values (?)", ('x'*(2**31),))
113        self.cur.execute("select 1 from test")
114        row = self.cur.fetchone()
115        self.assertIsNone(row)
116
117    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
118    @support.bigmemtest(size=2**31, memuse=3, dry_run=False)
119    def test_too_large_blob(self, maxsize):
120        with self.assertRaises(sqlite.DataError):
121            self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31-1),))
122        with self.assertRaises(sqlite.DataError):
123            self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31),))
124        self.cur.execute("select 1 from test")
125        row = self.cur.fetchone()
126        self.assertIsNone(row)
127
128
129class DeclTypesTests(unittest.TestCase):
130    class Foo:
131        def __init__(self, _val):
132            if isinstance(_val, bytes):
133                # sqlite3 always calls __init__ with a bytes created from a
134                # UTF-8 string when __conform__ was used to store the object.
135                _val = _val.decode('utf-8')
136            self.val = _val
137
138        def __eq__(self, other):
139            if not isinstance(other, DeclTypesTests.Foo):
140                return NotImplemented
141            return self.val == other.val
142
143        def __conform__(self, protocol):
144            if protocol is sqlite.PrepareProtocol:
145                return self.val
146            else:
147                return None
148
149        def __str__(self):
150            return "<%s>" % self.val
151
152    class BadConform:
153        def __init__(self, exc):
154            self.exc = exc
155        def __conform__(self, protocol):
156            raise self.exc
157
158    def setUp(self):
159        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
160        self.cur = self.con.cursor()
161        self.cur.execute("""
162            create table test(
163                i int,
164                s str,
165                f float,
166                b bool,
167                u unicode,
168                foo foo,
169                bin blob,
170                n1 number,
171                n2 number(5),
172                bad bad,
173                cbin cblob)
174        """)
175
176        # override float, make them always return the same number
177        sqlite.converters["FLOAT"] = lambda x: 47.2
178
179        # and implement two custom ones
180        sqlite.converters["BOOL"] = lambda x: bool(int(x))
181        sqlite.converters["FOO"] = DeclTypesTests.Foo
182        sqlite.converters["BAD"] = DeclTypesTests.BadConform
183        sqlite.converters["WRONG"] = lambda x: "WRONG"
184        sqlite.converters["NUMBER"] = float
185        sqlite.converters["CBLOB"] = lambda x: b"blobish"
186
187    def tearDown(self):
188        del sqlite.converters["FLOAT"]
189        del sqlite.converters["BOOL"]
190        del sqlite.converters["FOO"]
191        del sqlite.converters["BAD"]
192        del sqlite.converters["WRONG"]
193        del sqlite.converters["NUMBER"]
194        del sqlite.converters["CBLOB"]
195        self.cur.close()
196        self.con.close()
197
198    def test_string(self):
199        # default
200        self.cur.execute("insert into test(s) values (?)", ("foo",))
201        self.cur.execute('select s as "s [WRONG]" from test')
202        row = self.cur.fetchone()
203        self.assertEqual(row[0], "foo")
204
205    def test_small_int(self):
206        # default
207        self.cur.execute("insert into test(i) values (?)", (42,))
208        self.cur.execute("select i from test")
209        row = self.cur.fetchone()
210        self.assertEqual(row[0], 42)
211
212    def test_large_int(self):
213        # default
214        num = 123456789123456789
215        self.cur.execute("insert into test(i) values (?)", (num,))
216        self.cur.execute("select i from test")
217        row = self.cur.fetchone()
218        self.assertEqual(row[0], num)
219
220    def test_float(self):
221        # custom
222        val = 3.14
223        self.cur.execute("insert into test(f) values (?)", (val,))
224        self.cur.execute("select f from test")
225        row = self.cur.fetchone()
226        self.assertEqual(row[0], 47.2)
227
228    def test_bool(self):
229        # custom
230        self.cur.execute("insert into test(b) values (?)", (False,))
231        self.cur.execute("select b from test")
232        row = self.cur.fetchone()
233        self.assertIs(row[0], False)
234
235        self.cur.execute("delete from test")
236        self.cur.execute("insert into test(b) values (?)", (True,))
237        self.cur.execute("select b from test")
238        row = self.cur.fetchone()
239        self.assertIs(row[0], True)
240
241    def test_unicode(self):
242        # default
243        val = "\xd6sterreich"
244        self.cur.execute("insert into test(u) values (?)", (val,))
245        self.cur.execute("select u from test")
246        row = self.cur.fetchone()
247        self.assertEqual(row[0], val)
248
249    def test_foo(self):
250        val = DeclTypesTests.Foo("bla")
251        self.cur.execute("insert into test(foo) values (?)", (val,))
252        self.cur.execute("select foo from test")
253        row = self.cur.fetchone()
254        self.assertEqual(row[0], val)
255
256    def test_error_in_conform(self):
257        val = DeclTypesTests.BadConform(TypeError)
258        with self.assertRaises(sqlite.ProgrammingError):
259            self.cur.execute("insert into test(bad) values (?)", (val,))
260        with self.assertRaises(sqlite.ProgrammingError):
261            self.cur.execute("insert into test(bad) values (:val)", {"val": val})
262
263        val = DeclTypesTests.BadConform(KeyboardInterrupt)
264        with self.assertRaises(KeyboardInterrupt):
265            self.cur.execute("insert into test(bad) values (?)", (val,))
266        with self.assertRaises(KeyboardInterrupt):
267            self.cur.execute("insert into test(bad) values (:val)", {"val": val})
268
269    def test_unsupported_seq(self):
270        class Bar: pass
271        val = Bar()
272        with self.assertRaises(sqlite.ProgrammingError):
273            self.cur.execute("insert into test(f) values (?)", (val,))
274
275    def test_unsupported_dict(self):
276        class Bar: pass
277        val = Bar()
278        with self.assertRaises(sqlite.ProgrammingError):
279            self.cur.execute("insert into test(f) values (:val)", {"val": val})
280
281    def test_blob(self):
282        # default
283        sample = b"Guglhupf"
284        val = memoryview(sample)
285        self.cur.execute("insert into test(bin) values (?)", (val,))
286        self.cur.execute("select bin from test")
287        row = self.cur.fetchone()
288        self.assertEqual(row[0], sample)
289
290    def test_number1(self):
291        self.cur.execute("insert into test(n1) values (5)")
292        value = self.cur.execute("select n1 from test").fetchone()[0]
293        # if the converter is not used, it's an int instead of a float
294        self.assertEqual(type(value), float)
295
296    def test_number2(self):
297        """Checks whether converter names are cut off at '(' characters"""
298        self.cur.execute("insert into test(n2) values (5)")
299        value = self.cur.execute("select n2 from test").fetchone()[0]
300        # if the converter is not used, it's an int instead of a float
301        self.assertEqual(type(value), float)
302
303    def test_convert_zero_sized_blob(self):
304        self.con.execute("insert into test(cbin) values (?)", (b"",))
305        cur = self.con.execute("select cbin from test")
306        # Zero-sized blobs with converters returns None.  This differs from
307        # blobs without a converter, where b"" is returned.
308        self.assertIsNone(cur.fetchone()[0])
309
310
311class ColNamesTests(unittest.TestCase):
312    def setUp(self):
313        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
314        self.cur = self.con.cursor()
315        self.cur.execute("create table test(x foo)")
316
317        sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii")
318        sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii")
319        sqlite.converters["EXC"] = lambda x: 5/0
320        sqlite.converters["B1B1"] = lambda x: "MARKER"
321
322    def tearDown(self):
323        del sqlite.converters["FOO"]
324        del sqlite.converters["BAR"]
325        del sqlite.converters["EXC"]
326        del sqlite.converters["B1B1"]
327        self.cur.close()
328        self.con.close()
329
330    def test_decl_type_not_used(self):
331        """
332        Assures that the declared type is not used when PARSE_DECLTYPES
333        is not set.
334        """
335        self.cur.execute("insert into test(x) values (?)", ("xxx",))
336        self.cur.execute("select x from test")
337        val = self.cur.fetchone()[0]
338        self.assertEqual(val, "xxx")
339
340    def test_none(self):
341        self.cur.execute("insert into test(x) values (?)", (None,))
342        self.cur.execute("select x from test")
343        val = self.cur.fetchone()[0]
344        self.assertEqual(val, None)
345
346    def test_col_name(self):
347        self.cur.execute("insert into test(x) values (?)", ("xxx",))
348        self.cur.execute('select x as "x y [bar]" from test')
349        val = self.cur.fetchone()[0]
350        self.assertEqual(val, "<xxx>")
351
352        # Check if the stripping of colnames works. Everything after the first
353        # '[' (and the preceding space) should be stripped.
354        self.assertEqual(self.cur.description[0][0], "x y")
355
356    def test_case_in_converter_name(self):
357        self.cur.execute("select 'other' as \"x [b1b1]\"")
358        val = self.cur.fetchone()[0]
359        self.assertEqual(val, "MARKER")
360
361    def test_cursor_description_no_row(self):
362        """
363        cursor.description should at least provide the column name(s), even if
364        no row returned.
365        """
366        self.cur.execute("select * from test where 0 = 1")
367        self.assertEqual(self.cur.description[0][0], "x")
368
369    def test_cursor_description_insert(self):
370        self.cur.execute("insert into test values (1)")
371        self.assertIsNone(self.cur.description)
372
373
374@unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "CTEs not supported")
375class CommonTableExpressionTests(unittest.TestCase):
376
377    def setUp(self):
378        self.con = sqlite.connect(":memory:")
379        self.cur = self.con.cursor()
380        self.cur.execute("create table test(x foo)")
381
382    def tearDown(self):
383        self.cur.close()
384        self.con.close()
385
386    def test_cursor_description_cte_simple(self):
387        self.cur.execute("with one as (select 1) select * from one")
388        self.assertIsNotNone(self.cur.description)
389        self.assertEqual(self.cur.description[0][0], "1")
390
391    def test_cursor_description_cte_multiple_columns(self):
392        self.cur.execute("insert into test values(1)")
393        self.cur.execute("insert into test values(2)")
394        self.cur.execute("with testCTE as (select * from test) select * from testCTE")
395        self.assertIsNotNone(self.cur.description)
396        self.assertEqual(self.cur.description[0][0], "x")
397
398    def test_cursor_description_cte(self):
399        self.cur.execute("insert into test values (1)")
400        self.cur.execute("with bar as (select * from test) select * from test where x = 1")
401        self.assertIsNotNone(self.cur.description)
402        self.assertEqual(self.cur.description[0][0], "x")
403        self.cur.execute("with bar as (select * from test) select * from test where x = 2")
404        self.assertIsNotNone(self.cur.description)
405        self.assertEqual(self.cur.description[0][0], "x")
406
407
408class ObjectAdaptationTests(unittest.TestCase):
409    def cast(obj):
410        return float(obj)
411    cast = staticmethod(cast)
412
413    def setUp(self):
414        self.con = sqlite.connect(":memory:")
415        try:
416            del sqlite.adapters[int]
417        except:
418            pass
419        sqlite.register_adapter(int, ObjectAdaptationTests.cast)
420        self.cur = self.con.cursor()
421
422    def tearDown(self):
423        del sqlite.adapters[(int, sqlite.PrepareProtocol)]
424        self.cur.close()
425        self.con.close()
426
427    def test_caster_is_used(self):
428        self.cur.execute("select ?", (4,))
429        val = self.cur.fetchone()[0]
430        self.assertEqual(type(val), float)
431
432    def test_missing_adapter(self):
433        with self.assertRaises(sqlite.ProgrammingError):
434            sqlite.adapt(1.)  # No float adapter registered
435
436    def test_missing_protocol(self):
437        with self.assertRaises(sqlite.ProgrammingError):
438            sqlite.adapt(1, None)
439
440    def test_defect_proto(self):
441        class DefectProto():
442            def __adapt__(self):
443                return None
444        with self.assertRaises(sqlite.ProgrammingError):
445            sqlite.adapt(1., DefectProto)
446
447    def test_defect_self_adapt(self):
448        class DefectSelfAdapt(float):
449            def __conform__(self, _):
450                return None
451        with self.assertRaises(sqlite.ProgrammingError):
452            sqlite.adapt(DefectSelfAdapt(1.))
453
454    def test_custom_proto(self):
455        class CustomProto():
456            def __adapt__(self):
457                return "adapted"
458        self.assertEqual(sqlite.adapt(1., CustomProto), "adapted")
459
460    def test_adapt(self):
461        val = 42
462        self.assertEqual(float(val), sqlite.adapt(val))
463
464    def test_adapt_alt(self):
465        alt = "other"
466        self.assertEqual(alt, sqlite.adapt(1., None, alt))
467
468
469@unittest.skipUnless(zlib, "requires zlib")
470class BinaryConverterTests(unittest.TestCase):
471    def convert(s):
472        return zlib.decompress(s)
473    convert = staticmethod(convert)
474
475    def setUp(self):
476        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
477        sqlite.register_converter("bin", BinaryConverterTests.convert)
478
479    def tearDown(self):
480        self.con.close()
481
482    def test_binary_input_for_converter(self):
483        testdata = b"abcdefg" * 10
484        result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0]
485        self.assertEqual(testdata, result)
486
487class DateTimeTests(unittest.TestCase):
488    def setUp(self):
489        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
490        self.cur = self.con.cursor()
491        self.cur.execute("create table test(d date, ts timestamp)")
492
493    def tearDown(self):
494        self.cur.close()
495        self.con.close()
496
497    def test_sqlite_date(self):
498        d = sqlite.Date(2004, 2, 14)
499        self.cur.execute("insert into test(d) values (?)", (d,))
500        self.cur.execute("select d from test")
501        d2 = self.cur.fetchone()[0]
502        self.assertEqual(d, d2)
503
504    def test_sqlite_timestamp(self):
505        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
506        self.cur.execute("insert into test(ts) values (?)", (ts,))
507        self.cur.execute("select ts from test")
508        ts2 = self.cur.fetchone()[0]
509        self.assertEqual(ts, ts2)
510
511    def test_sql_timestamp(self):
512        now = datetime.datetime.utcnow()
513        self.cur.execute("insert into test(ts) values (current_timestamp)")
514        self.cur.execute("select ts from test")
515        ts = self.cur.fetchone()[0]
516        self.assertEqual(type(ts), datetime.datetime)
517        self.assertEqual(ts.year, now.year)
518
519    def test_date_time_sub_seconds(self):
520        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
521        self.cur.execute("insert into test(ts) values (?)", (ts,))
522        self.cur.execute("select ts from test")
523        ts2 = self.cur.fetchone()[0]
524        self.assertEqual(ts, ts2)
525
526    def test_date_time_sub_seconds_floating_point(self):
527        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
528        self.cur.execute("insert into test(ts) values (?)", (ts,))
529        self.cur.execute("select ts from test")
530        ts2 = self.cur.fetchone()[0]
531        self.assertEqual(ts, ts2)
532
533
534if __name__ == "__main__":
535    unittest.main()
536