1import unittest 2 3from cachetools import TTLCache 4 5from . import CacheTestMixin 6 7 8class Timer: 9 def __init__(self, auto=False): 10 self.auto = auto 11 self.time = 0 12 13 def __call__(self): 14 if self.auto: 15 self.time += 1 16 return self.time 17 18 def tick(self): 19 self.time += 1 20 21 22class TTLTestCache(TTLCache): 23 def __init__(self, maxsize, ttl=0, **kwargs): 24 TTLCache.__init__(self, maxsize, ttl=ttl, timer=Timer(), **kwargs) 25 26 27class TTLCacheTest(unittest.TestCase, CacheTestMixin): 28 29 Cache = TTLTestCache 30 31 def test_ttl(self): 32 cache = TTLCache(maxsize=2, ttl=1, timer=Timer()) 33 self.assertEqual(0, cache.timer()) 34 self.assertEqual(1, cache.ttl) 35 36 cache[1] = 1 37 self.assertEqual({1}, set(cache)) 38 self.assertEqual(1, len(cache)) 39 self.assertEqual(1, cache[1]) 40 41 cache.timer.tick() 42 self.assertEqual({1}, set(cache)) 43 self.assertEqual(1, len(cache)) 44 self.assertEqual(1, cache[1]) 45 46 cache[2] = 2 47 self.assertEqual({1, 2}, set(cache)) 48 self.assertEqual(2, len(cache)) 49 self.assertEqual(1, cache[1]) 50 self.assertEqual(2, cache[2]) 51 52 cache.timer.tick() 53 self.assertEqual({2}, set(cache)) 54 self.assertEqual(1, len(cache)) 55 self.assertNotIn(1, cache) 56 self.assertEqual(2, cache[2]) 57 58 cache[3] = 3 59 self.assertEqual({2, 3}, set(cache)) 60 self.assertEqual(2, len(cache)) 61 self.assertNotIn(1, cache) 62 self.assertEqual(2, cache[2]) 63 self.assertEqual(3, cache[3]) 64 65 cache.timer.tick() 66 self.assertEqual({3}, set(cache)) 67 self.assertEqual(1, len(cache)) 68 self.assertNotIn(1, cache) 69 self.assertNotIn(2, cache) 70 self.assertEqual(3, cache[3]) 71 72 cache.timer.tick() 73 self.assertEqual(set(), set(cache)) 74 self.assertEqual(0, len(cache)) 75 self.assertNotIn(1, cache) 76 self.assertNotIn(2, cache) 77 self.assertNotIn(3, cache) 78 79 with self.assertRaises(KeyError): 80 del cache[1] 81 with self.assertRaises(KeyError): 82 cache.pop(2) 83 with self.assertRaises(KeyError): 84 del cache[3] 85 86 def test_ttl_lru(self): 87 cache = TTLCache(maxsize=2, ttl=0, timer=Timer()) 88 89 cache[1] = 1 90 cache[2] = 2 91 cache[3] = 3 92 93 self.assertEqual(len(cache), 2) 94 self.assertNotIn(1, cache) 95 self.assertEqual(cache[2], 2) 96 self.assertEqual(cache[3], 3) 97 98 cache[2] 99 cache[4] = 4 100 self.assertEqual(len(cache), 2) 101 self.assertNotIn(1, cache) 102 self.assertEqual(cache[2], 2) 103 self.assertNotIn(3, cache) 104 self.assertEqual(cache[4], 4) 105 106 cache[5] = 5 107 self.assertEqual(len(cache), 2) 108 self.assertNotIn(1, cache) 109 self.assertNotIn(2, cache) 110 self.assertNotIn(3, cache) 111 self.assertEqual(cache[4], 4) 112 self.assertEqual(cache[5], 5) 113 114 def test_ttl_expire(self): 115 cache = TTLCache(maxsize=3, ttl=2, timer=Timer()) 116 with cache.timer as time: 117 self.assertEqual(time, cache.timer()) 118 self.assertEqual(2, cache.ttl) 119 120 cache[1] = 1 121 cache.timer.tick() 122 cache[2] = 2 123 cache.timer.tick() 124 cache[3] = 3 125 self.assertEqual(2, cache.timer()) 126 127 self.assertEqual({1, 2, 3}, set(cache)) 128 self.assertEqual(3, len(cache)) 129 self.assertEqual(1, cache[1]) 130 self.assertEqual(2, cache[2]) 131 self.assertEqual(3, cache[3]) 132 133 cache.expire() 134 self.assertEqual({1, 2, 3}, set(cache)) 135 self.assertEqual(3, len(cache)) 136 self.assertEqual(1, cache[1]) 137 self.assertEqual(2, cache[2]) 138 self.assertEqual(3, cache[3]) 139 140 cache.expire(3) 141 self.assertEqual({2, 3}, set(cache)) 142 self.assertEqual(2, len(cache)) 143 self.assertNotIn(1, cache) 144 self.assertEqual(2, cache[2]) 145 self.assertEqual(3, cache[3]) 146 147 cache.expire(4) 148 self.assertEqual({3}, set(cache)) 149 self.assertEqual(1, len(cache)) 150 self.assertNotIn(1, cache) 151 self.assertNotIn(2, cache) 152 self.assertEqual(3, cache[3]) 153 154 cache.expire(5) 155 self.assertEqual(set(), set(cache)) 156 self.assertEqual(0, len(cache)) 157 self.assertNotIn(1, cache) 158 self.assertNotIn(2, cache) 159 self.assertNotIn(3, cache) 160 161 def test_ttl_atomic(self): 162 cache = TTLCache(maxsize=1, ttl=1, timer=Timer(auto=True)) 163 cache[1] = 1 164 self.assertEqual(1, cache[1]) 165 cache[1] = 1 166 self.assertEqual(1, cache.get(1)) 167 cache[1] = 1 168 self.assertEqual(1, cache.pop(1)) 169 cache[1] = 1 170 self.assertEqual(1, cache.setdefault(1)) 171 cache[1] = 1 172 cache.clear() 173 self.assertEqual(0, len(cache)) 174 175 def test_ttl_tuple_key(self): 176 cache = TTLCache(maxsize=1, ttl=0, timer=Timer()) 177 self.assertEqual(0, cache.ttl) 178 179 cache[(1, 2, 3)] = 42 180 self.assertEqual(42, cache[(1, 2, 3)]) 181 cache.timer.tick() 182 with self.assertRaises(KeyError): 183 cache[(1, 2, 3)] 184 self.assertNotIn((1, 2, 3), cache) 185 186 def test_ttl_datetime(self): 187 from datetime import datetime, timedelta 188 189 cache = TTLCache(maxsize=1, ttl=timedelta(days=1), timer=datetime.now) 190 191 cache[1] = 1 192 self.assertEqual(1, len(cache)) 193 cache.expire(datetime.now()) 194 self.assertEqual(1, len(cache)) 195 cache.expire(datetime.now() + timedelta(days=1)) 196 self.assertEqual(0, len(cache)) 197