1import unittest
2
3from cachetools import TTLCache
4
5from . import CacheTestMixin
6
7
8class Timer:
9    def __init__(self, auto=False):
10        self.auto = auto
11        self.time = 0
12
13    def __call__(self):
14        if self.auto:
15            self.time += 1
16        return self.time
17
18    def tick(self):
19        self.time += 1
20
21
22class TTLTestCache(TTLCache):
23    def __init__(self, maxsize, ttl=0, **kwargs):
24        TTLCache.__init__(self, maxsize, ttl=ttl, timer=Timer(), **kwargs)
25
26
27class TTLCacheTest(unittest.TestCase, CacheTestMixin):
28
29    Cache = TTLTestCache
30
31    def test_ttl(self):
32        cache = TTLCache(maxsize=2, ttl=1, timer=Timer())
33        self.assertEqual(0, cache.timer())
34        self.assertEqual(1, cache.ttl)
35
36        cache[1] = 1
37        self.assertEqual({1}, set(cache))
38        self.assertEqual(1, len(cache))
39        self.assertEqual(1, cache[1])
40
41        cache.timer.tick()
42        self.assertEqual({1}, set(cache))
43        self.assertEqual(1, len(cache))
44        self.assertEqual(1, cache[1])
45
46        cache[2] = 2
47        self.assertEqual({1, 2}, set(cache))
48        self.assertEqual(2, len(cache))
49        self.assertEqual(1, cache[1])
50        self.assertEqual(2, cache[2])
51
52        cache.timer.tick()
53        self.assertEqual({2}, set(cache))
54        self.assertEqual(1, len(cache))
55        self.assertNotIn(1, cache)
56        self.assertEqual(2, cache[2])
57
58        cache[3] = 3
59        self.assertEqual({2, 3}, set(cache))
60        self.assertEqual(2, len(cache))
61        self.assertNotIn(1, cache)
62        self.assertEqual(2, cache[2])
63        self.assertEqual(3, cache[3])
64
65        cache.timer.tick()
66        self.assertEqual({3}, set(cache))
67        self.assertEqual(1, len(cache))
68        self.assertNotIn(1, cache)
69        self.assertNotIn(2, cache)
70        self.assertEqual(3, cache[3])
71
72        cache.timer.tick()
73        self.assertEqual(set(), set(cache))
74        self.assertEqual(0, len(cache))
75        self.assertNotIn(1, cache)
76        self.assertNotIn(2, cache)
77        self.assertNotIn(3, cache)
78
79        with self.assertRaises(KeyError):
80            del cache[1]
81        with self.assertRaises(KeyError):
82            cache.pop(2)
83        with self.assertRaises(KeyError):
84            del cache[3]
85
86    def test_ttl_lru(self):
87        cache = TTLCache(maxsize=2, ttl=0, timer=Timer())
88
89        cache[1] = 1
90        cache[2] = 2
91        cache[3] = 3
92
93        self.assertEqual(len(cache), 2)
94        self.assertNotIn(1, cache)
95        self.assertEqual(cache[2], 2)
96        self.assertEqual(cache[3], 3)
97
98        cache[2]
99        cache[4] = 4
100        self.assertEqual(len(cache), 2)
101        self.assertNotIn(1, cache)
102        self.assertEqual(cache[2], 2)
103        self.assertNotIn(3, cache)
104        self.assertEqual(cache[4], 4)
105
106        cache[5] = 5
107        self.assertEqual(len(cache), 2)
108        self.assertNotIn(1, cache)
109        self.assertNotIn(2, cache)
110        self.assertNotIn(3, cache)
111        self.assertEqual(cache[4], 4)
112        self.assertEqual(cache[5], 5)
113
114    def test_ttl_expire(self):
115        cache = TTLCache(maxsize=3, ttl=2, timer=Timer())
116        with cache.timer as time:
117            self.assertEqual(time, cache.timer())
118        self.assertEqual(2, cache.ttl)
119
120        cache[1] = 1
121        cache.timer.tick()
122        cache[2] = 2
123        cache.timer.tick()
124        cache[3] = 3
125        self.assertEqual(2, cache.timer())
126
127        self.assertEqual({1, 2, 3}, set(cache))
128        self.assertEqual(3, len(cache))
129        self.assertEqual(1, cache[1])
130        self.assertEqual(2, cache[2])
131        self.assertEqual(3, cache[3])
132
133        cache.expire()
134        self.assertEqual({1, 2, 3}, set(cache))
135        self.assertEqual(3, len(cache))
136        self.assertEqual(1, cache[1])
137        self.assertEqual(2, cache[2])
138        self.assertEqual(3, cache[3])
139
140        cache.expire(3)
141        self.assertEqual({2, 3}, set(cache))
142        self.assertEqual(2, len(cache))
143        self.assertNotIn(1, cache)
144        self.assertEqual(2, cache[2])
145        self.assertEqual(3, cache[3])
146
147        cache.expire(4)
148        self.assertEqual({3}, set(cache))
149        self.assertEqual(1, len(cache))
150        self.assertNotIn(1, cache)
151        self.assertNotIn(2, cache)
152        self.assertEqual(3, cache[3])
153
154        cache.expire(5)
155        self.assertEqual(set(), set(cache))
156        self.assertEqual(0, len(cache))
157        self.assertNotIn(1, cache)
158        self.assertNotIn(2, cache)
159        self.assertNotIn(3, cache)
160
161    def test_ttl_atomic(self):
162        cache = TTLCache(maxsize=1, ttl=1, timer=Timer(auto=True))
163        cache[1] = 1
164        self.assertEqual(1, cache[1])
165        cache[1] = 1
166        self.assertEqual(1, cache.get(1))
167        cache[1] = 1
168        self.assertEqual(1, cache.pop(1))
169        cache[1] = 1
170        self.assertEqual(1, cache.setdefault(1))
171        cache[1] = 1
172        cache.clear()
173        self.assertEqual(0, len(cache))
174
175    def test_ttl_tuple_key(self):
176        cache = TTLCache(maxsize=1, ttl=0, timer=Timer())
177        self.assertEqual(0, cache.ttl)
178
179        cache[(1, 2, 3)] = 42
180        self.assertEqual(42, cache[(1, 2, 3)])
181        cache.timer.tick()
182        with self.assertRaises(KeyError):
183            cache[(1, 2, 3)]
184        self.assertNotIn((1, 2, 3), cache)
185
186    def test_ttl_datetime(self):
187        from datetime import datetime, timedelta
188
189        cache = TTLCache(maxsize=1, ttl=timedelta(days=1), timer=datetime.now)
190
191        cache[1] = 1
192        self.assertEqual(1, len(cache))
193        cache.expire(datetime.now())
194        self.assertEqual(1, len(cache))
195        cache.expire(datetime.now() + timedelta(days=1))
196        self.assertEqual(0, len(cache))
197