1#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks
3#
4# Copyright (C) 2006-2007 Gerhard H�ring <[email protected]>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty.  In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17#    claim that you wrote the original software. If you use this software
18#    in a product, an acknowledgment in the product documentation would be
19#    appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21#    misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import os, unittest
25import sqlite3 as sqlite
26
27class CollationTests(unittest.TestCase):
28    def setUp(self):
29        pass
30
31    def tearDown(self):
32        pass
33
34    def CheckCreateCollationNotString(self):
35        con = sqlite.connect(":memory:")
36        with self.assertRaises(TypeError):
37            con.create_collation(None, lambda x, y: (x > y) - (x < y))
38
39    def CheckCreateCollationNotCallable(self):
40        con = sqlite.connect(":memory:")
41        try:
42            con.create_collation("X", 42)
43            self.fail("should have raised a TypeError")
44        except TypeError, e:
45            self.assertEqual(e.args[0], "parameter must be callable")
46
47    def CheckCreateCollationNotAscii(self):
48        con = sqlite.connect(":memory:")
49        try:
50            con.create_collation("coll�", cmp)
51            self.fail("should have raised a ProgrammingError")
52        except sqlite.ProgrammingError, e:
53            pass
54
55    def CheckCreateCollationBadUpper(self):
56        class BadUpperStr(str):
57            def upper(self):
58                return None
59        con = sqlite.connect(":memory:")
60        mycoll = lambda x, y: -((x > y) - (x < y))
61        con.create_collation(BadUpperStr("mycoll"), mycoll)
62        result = con.execute("""
63            select x from (
64            select 'a' as x
65            union
66            select 'b' as x
67            ) order by x collate mycoll
68            """).fetchall()
69        self.assertEqual(result[0][0], 'b')
70        self.assertEqual(result[1][0], 'a')
71
72    def CheckCollationIsUsed(self):
73        if sqlite.version_info < (3, 2, 1):  # old SQLite versions crash on this test
74            return
75        def mycoll(x, y):
76            # reverse order
77            return -cmp(x, y)
78
79        con = sqlite.connect(":memory:")
80        con.create_collation("mycoll", mycoll)
81        sql = """
82            select x from (
83            select 'a' as x
84            union
85            select 'b' as x
86            union
87            select 'c' as x
88            ) order by x collate mycoll
89            """
90        result = con.execute(sql).fetchall()
91        if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a":
92            self.fail("the expected order was not returned")
93
94        con.create_collation("mycoll", None)
95        try:
96            result = con.execute(sql).fetchall()
97            self.fail("should have raised an OperationalError")
98        except sqlite.OperationalError, e:
99            self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll")
100
101    def CheckCollationReturnsLargeInteger(self):
102        def mycoll(x, y):
103            # reverse order
104            return -((x > y) - (x < y)) * 2**32
105        con = sqlite.connect(":memory:")
106        con.create_collation("mycoll", mycoll)
107        sql = """
108            select x from (
109            select 'a' as x
110            union
111            select 'b' as x
112            union
113            select 'c' as x
114            ) order by x collate mycoll
115            """
116        result = con.execute(sql).fetchall()
117        self.assertEqual(result, [('c',), ('b',), ('a',)],
118                         msg="the expected order was not returned")
119
120    def CheckCollationRegisterTwice(self):
121        """
122        Register two different collation functions under the same name.
123        Verify that the last one is actually used.
124        """
125        con = sqlite.connect(":memory:")
126        con.create_collation("mycoll", cmp)
127        con.create_collation("mycoll", lambda x, y: -cmp(x, y))
128        result = con.execute("""
129            select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
130            """).fetchall()
131        if result[0][0] != 'b' or result[1][0] != 'a':
132            self.fail("wrong collation function is used")
133
134    def CheckDeregisterCollation(self):
135        """
136        Register a collation, then deregister it. Make sure an error is raised if we try
137        to use it.
138        """
139        con = sqlite.connect(":memory:")
140        con.create_collation("mycoll", cmp)
141        con.create_collation("mycoll", None)
142        try:
143            con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
144            self.fail("should have raised an OperationalError")
145        except sqlite.OperationalError, e:
146            if not e.args[0].startswith("no such collation sequence"):
147                self.fail("wrong OperationalError raised")
148
149class ProgressTests(unittest.TestCase):
150    def CheckProgressHandlerUsed(self):
151        """
152        Test that the progress handler is invoked once it is set.
153        """
154        con = sqlite.connect(":memory:")
155        progress_calls = []
156        def progress():
157            progress_calls.append(None)
158            return 0
159        con.set_progress_handler(progress, 1)
160        con.execute("""
161            create table foo(a, b)
162            """)
163        self.assertTrue(progress_calls)
164
165
166    def CheckOpcodeCount(self):
167        """
168        Test that the opcode argument is respected.
169        """
170        con = sqlite.connect(":memory:")
171        progress_calls = []
172        def progress():
173            progress_calls.append(None)
174            return 0
175        con.set_progress_handler(progress, 1)
176        curs = con.cursor()
177        curs.execute("""
178            create table foo (a, b)
179            """)
180        first_count = len(progress_calls)
181        progress_calls = []
182        con.set_progress_handler(progress, 2)
183        curs.execute("""
184            create table bar (a, b)
185            """)
186        second_count = len(progress_calls)
187        self.assertGreaterEqual(first_count, second_count)
188
189    def CheckCancelOperation(self):
190        """
191        Test that returning a non-zero value stops the operation in progress.
192        """
193        con = sqlite.connect(":memory:")
194        progress_calls = []
195        def progress():
196            progress_calls.append(None)
197            return 1
198        con.set_progress_handler(progress, 1)
199        curs = con.cursor()
200        self.assertRaises(
201            sqlite.OperationalError,
202            curs.execute,
203            "create table bar (a, b)")
204
205    def CheckClearHandler(self):
206        """
207        Test that setting the progress handler to None clears the previously set handler.
208        """
209        con = sqlite.connect(":memory:")
210        action = []
211        def progress():
212            action.append(1)
213            return 0
214        con.set_progress_handler(progress, 1)
215        con.set_progress_handler(None, 1)
216        con.execute("select 1 union select 2 union select 3").fetchall()
217        self.assertEqual(len(action), 0, "progress handler was not cleared")
218
219def suite():
220    collation_suite = unittest.makeSuite(CollationTests, "Check")
221    progress_suite = unittest.makeSuite(ProgressTests, "Check")
222    return unittest.TestSuite((collation_suite, progress_suite))
223
224def test():
225    runner = unittest.TextTestRunner()
226    runner.run(suite())
227
228if __name__ == "__main__":
229    test()
230