1import doctest 2import unittest 3from test import support 4from test.support import threading_helper 5from itertools import * 6import weakref 7from decimal import Decimal 8from fractions import Fraction 9import operator 10import random 11import copy 12import pickle 13from functools import reduce 14import sys 15import struct 16import threading 17import gc 18 19maxsize = support.MAX_Py_ssize_t 20minsize = -maxsize-1 21 22def lzip(*args): 23 return list(zip(*args)) 24 25def onearg(x): 26 'Test function of one argument' 27 return 2*x 28 29def errfunc(*args): 30 'Test function that raises an error' 31 raise ValueError 32 33def gen3(): 34 'Non-restartable source sequence' 35 for i in (0, 1, 2): 36 yield i 37 38def isEven(x): 39 'Test predicate' 40 return x%2==0 41 42def isOdd(x): 43 'Test predicate' 44 return x%2==1 45 46def tupleize(*args): 47 return args 48 49def irange(n): 50 for i in range(n): 51 yield i 52 53class StopNow: 54 'Class emulating an empty iterable.' 55 def __iter__(self): 56 return self 57 def __next__(self): 58 raise StopIteration 59 60def take(n, seq): 61 'Convenience function for partially consuming a long of infinite iterable' 62 return list(islice(seq, n)) 63 64def prod(iterable): 65 return reduce(operator.mul, iterable, 1) 66 67def fact(n): 68 'Factorial' 69 return prod(range(1, n+1)) 70 71# root level methods for pickling ability 72def testR(r): 73 return r[0] 74 75def testR2(r): 76 return r[2] 77 78def underten(x): 79 return x<10 80 81picklecopiers = [lambda s, proto=proto: pickle.loads(pickle.dumps(s, proto)) 82 for proto in range(pickle.HIGHEST_PROTOCOL + 1)] 83 84class TestBasicOps(unittest.TestCase): 85 86 def pickletest(self, protocol, it, stop=4, take=1, compare=None): 87 """Test that an iterator is the same after pickling, also when part-consumed""" 88 def expand(it, i=0): 89 # Recursively expand iterables, within sensible bounds 90 if i > 10: 91 raise RuntimeError("infinite recursion encountered") 92 if isinstance(it, str): 93 return it 94 try: 95 l = list(islice(it, stop)) 96 except TypeError: 97 return it # can't expand it 98 return [expand(e, i+1) for e in l] 99 100 # Test the initial copy against the original 101 dump = pickle.dumps(it, protocol) 102 i2 = pickle.loads(dump) 103 self.assertEqual(type(it), type(i2)) 104 a, b = expand(it), expand(i2) 105 self.assertEqual(a, b) 106 if compare: 107 c = expand(compare) 108 self.assertEqual(a, c) 109 110 # Take from the copy, and create another copy and compare them. 111 i3 = pickle.loads(dump) 112 took = 0 113 try: 114 for i in range(take): 115 next(i3) 116 took += 1 117 except StopIteration: 118 pass #in case there is less data than 'take' 119 dump = pickle.dumps(i3, protocol) 120 i4 = pickle.loads(dump) 121 a, b = expand(i3), expand(i4) 122 self.assertEqual(a, b) 123 if compare: 124 c = expand(compare[took:]) 125 self.assertEqual(a, c); 126 127 def test_accumulate(self): 128 self.assertEqual(list(accumulate(range(10))), # one positional arg 129 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 130 self.assertEqual(list(accumulate(iterable=range(10))), # kw arg 131 [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]) 132 for typ in int, complex, Decimal, Fraction: # multiple types 133 self.assertEqual( 134 list(accumulate(map(typ, range(10)))), 135 list(map(typ, [0, 1, 3, 6, 10, 15, 21, 28, 36, 45]))) 136 self.assertEqual(list(accumulate('abc')), ['a', 'ab', 'abc']) # works with non-numeric 137 self.assertEqual(list(accumulate([])), []) # empty iterable 138 self.assertEqual(list(accumulate([7])), [7]) # iterable of length one 139 self.assertRaises(TypeError, accumulate, range(10), 5, 6) # too many args 140 self.assertRaises(TypeError, accumulate) # too few args 141 self.assertRaises(TypeError, accumulate, x=range(10)) # unexpected kwd arg 142 self.assertRaises(TypeError, list, accumulate([1, []])) # args that don't add 143 144 s = [2, 8, 9, 5, 7, 0, 3, 4, 1, 6] 145 self.assertEqual(list(accumulate(s, min)), 146 [2, 2, 2, 2, 2, 0, 0, 0, 0, 0]) 147 self.assertEqual(list(accumulate(s, max)), 148 [2, 8, 9, 9, 9, 9, 9, 9, 9, 9]) 149 self.assertEqual(list(accumulate(s, operator.mul)), 150 [2, 16, 144, 720, 5040, 0, 0, 0, 0, 0]) 151 with self.assertRaises(TypeError): 152 list(accumulate(s, chr)) # unary-operation 153 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 154 self.pickletest(proto, accumulate(range(10))) # test pickling 155 self.pickletest(proto, accumulate(range(10), initial=7)) 156 self.assertEqual(list(accumulate([10, 5, 1], initial=None)), [10, 15, 16]) 157 self.assertEqual(list(accumulate([10, 5, 1], initial=100)), [100, 110, 115, 116]) 158 self.assertEqual(list(accumulate([], initial=100)), [100]) 159 with self.assertRaises(TypeError): 160 list(accumulate([10, 20], 100)) 161 162 def test_chain(self): 163 164 def chain2(*iterables): 165 'Pure python version in the docs' 166 for it in iterables: 167 for element in it: 168 yield element 169 170 for c in (chain, chain2): 171 self.assertEqual(list(c('abc', 'def')), list('abcdef')) 172 self.assertEqual(list(c('abc')), list('abc')) 173 self.assertEqual(list(c('')), []) 174 self.assertEqual(take(4, c('abc', 'def')), list('abcd')) 175 self.assertRaises(TypeError, list,c(2, 3)) 176 177 def test_chain_from_iterable(self): 178 self.assertEqual(list(chain.from_iterable(['abc', 'def'])), list('abcdef')) 179 self.assertEqual(list(chain.from_iterable(['abc'])), list('abc')) 180 self.assertEqual(list(chain.from_iterable([''])), []) 181 self.assertEqual(take(4, chain.from_iterable(['abc', 'def'])), list('abcd')) 182 self.assertRaises(TypeError, list, chain.from_iterable([2, 3])) 183 184 def test_chain_reducible(self): 185 for oper in [copy.deepcopy] + picklecopiers: 186 it = chain('abc', 'def') 187 self.assertEqual(list(oper(it)), list('abcdef')) 188 self.assertEqual(next(it), 'a') 189 self.assertEqual(list(oper(it)), list('bcdef')) 190 191 self.assertEqual(list(oper(chain(''))), []) 192 self.assertEqual(take(4, oper(chain('abc', 'def'))), list('abcd')) 193 self.assertRaises(TypeError, list, oper(chain(2, 3))) 194 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 195 self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef')) 196 197 def test_chain_setstate(self): 198 self.assertRaises(TypeError, chain().__setstate__, ()) 199 self.assertRaises(TypeError, chain().__setstate__, []) 200 self.assertRaises(TypeError, chain().__setstate__, 0) 201 self.assertRaises(TypeError, chain().__setstate__, ([],)) 202 self.assertRaises(TypeError, chain().__setstate__, (iter([]), [])) 203 it = chain() 204 it.__setstate__((iter(['abc', 'def']),)) 205 self.assertEqual(list(it), ['a', 'b', 'c', 'd', 'e', 'f']) 206 it = chain() 207 it.__setstate__((iter(['abc', 'def']), iter(['ghi']))) 208 self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f']) 209 210 def test_combinations(self): 211 self.assertRaises(TypeError, combinations, 'abc') # missing r argument 212 self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments 213 self.assertRaises(TypeError, combinations, None) # pool is not iterable 214 self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative 215 216 for op in [lambda a:a] + picklecopiers: 217 self.assertEqual(list(op(combinations('abc', 32))), []) # r > n 218 219 self.assertEqual(list(op(combinations('ABCD', 2))), 220 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 221 testIntermediate = combinations('ABCD', 2) 222 next(testIntermediate) 223 self.assertEqual(list(op(testIntermediate)), 224 [('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 225 226 self.assertEqual(list(op(combinations(range(4), 3))), 227 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 228 testIntermediate = combinations(range(4), 3) 229 next(testIntermediate) 230 self.assertEqual(list(op(testIntermediate)), 231 [(0,1,3), (0,2,3), (1,2,3)]) 232 233 234 def combinations1(iterable, r): 235 'Pure python version shown in the docs' 236 pool = tuple(iterable) 237 n = len(pool) 238 if r > n: 239 return 240 indices = list(range(r)) 241 yield tuple(pool[i] for i in indices) 242 while 1: 243 for i in reversed(range(r)): 244 if indices[i] != i + n - r: 245 break 246 else: 247 return 248 indices[i] += 1 249 for j in range(i+1, r): 250 indices[j] = indices[j-1] + 1 251 yield tuple(pool[i] for i in indices) 252 253 def combinations2(iterable, r): 254 'Pure python version shown in the docs' 255 pool = tuple(iterable) 256 n = len(pool) 257 for indices in permutations(range(n), r): 258 if sorted(indices) == list(indices): 259 yield tuple(pool[i] for i in indices) 260 261 def combinations3(iterable, r): 262 'Pure python version from cwr()' 263 pool = tuple(iterable) 264 n = len(pool) 265 for indices in combinations_with_replacement(range(n), r): 266 if len(set(indices)) == r: 267 yield tuple(pool[i] for i in indices) 268 269 for n in range(7): 270 values = [5*x-12 for x in range(n)] 271 for r in range(n+2): 272 result = list(combinations(values, r)) 273 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(r) / fact(n-r)) # right number of combs 274 self.assertEqual(len(result), len(set(result))) # no repeats 275 self.assertEqual(result, sorted(result)) # lexicographic order 276 for c in result: 277 self.assertEqual(len(c), r) # r-length combinations 278 self.assertEqual(len(set(c)), r) # no duplicate elements 279 self.assertEqual(list(c), sorted(c)) # keep original ordering 280 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 281 self.assertEqual(list(c), 282 [e for e in values if e in c]) # comb is a subsequence of the input iterable 283 self.assertEqual(result, list(combinations1(values, r))) # matches first pure python version 284 self.assertEqual(result, list(combinations2(values, r))) # matches second pure python version 285 self.assertEqual(result, list(combinations3(values, r))) # matches second pure python version 286 287 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 288 self.pickletest(proto, combinations(values, r)) # test pickling 289 290 @support.bigaddrspacetest 291 def test_combinations_overflow(self): 292 with self.assertRaises((OverflowError, MemoryError)): 293 combinations("AA", 2**29) 294 295 # Test implementation detail: tuple re-use 296 @support.impl_detail("tuple reuse is specific to CPython") 297 def test_combinations_tuple_reuse(self): 298 self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1) 299 self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1) 300 301 def test_combinations_with_replacement(self): 302 cwr = combinations_with_replacement 303 self.assertRaises(TypeError, cwr, 'abc') # missing r argument 304 self.assertRaises(TypeError, cwr, 'abc', 2, 1) # too many arguments 305 self.assertRaises(TypeError, cwr, None) # pool is not iterable 306 self.assertRaises(ValueError, cwr, 'abc', -2) # r is negative 307 308 for op in [lambda a:a] + picklecopiers: 309 self.assertEqual(list(op(cwr('ABC', 2))), 310 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 311 testIntermediate = cwr('ABC', 2) 312 next(testIntermediate) 313 self.assertEqual(list(op(testIntermediate)), 314 [('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 315 316 317 def cwr1(iterable, r): 318 'Pure python version shown in the docs' 319 # number items returned: (n+r-1)! / r! / (n-1)! when n>0 320 pool = tuple(iterable) 321 n = len(pool) 322 if not n and r: 323 return 324 indices = [0] * r 325 yield tuple(pool[i] for i in indices) 326 while 1: 327 for i in reversed(range(r)): 328 if indices[i] != n - 1: 329 break 330 else: 331 return 332 indices[i:] = [indices[i] + 1] * (r - i) 333 yield tuple(pool[i] for i in indices) 334 335 def cwr2(iterable, r): 336 'Pure python version shown in the docs' 337 pool = tuple(iterable) 338 n = len(pool) 339 for indices in product(range(n), repeat=r): 340 if sorted(indices) == list(indices): 341 yield tuple(pool[i] for i in indices) 342 343 def numcombs(n, r): 344 if not n: 345 return 0 if r else 1 346 return fact(n+r-1) / fact(r)/ fact(n-1) 347 348 for n in range(7): 349 values = [5*x-12 for x in range(n)] 350 for r in range(n+2): 351 result = list(cwr(values, r)) 352 353 self.assertEqual(len(result), numcombs(n, r)) # right number of combs 354 self.assertEqual(len(result), len(set(result))) # no repeats 355 self.assertEqual(result, sorted(result)) # lexicographic order 356 357 regular_combs = list(combinations(values, r)) # compare to combs without replacement 358 if n == 0 or r <= 1: 359 self.assertEqual(result, regular_combs) # cases that should be identical 360 else: 361 self.assertTrue(set(result) >= set(regular_combs)) # rest should be supersets of regular combs 362 363 for c in result: 364 self.assertEqual(len(c), r) # r-length combinations 365 noruns = [k for k,v in groupby(c)] # combo without consecutive repeats 366 self.assertEqual(len(noruns), len(set(noruns))) # no repeats other than consecutive 367 self.assertEqual(list(c), sorted(c)) # keep original ordering 368 self.assertTrue(all(e in values for e in c)) # elements taken from input iterable 369 self.assertEqual(noruns, 370 [e for e in values if e in c]) # comb is a subsequence of the input iterable 371 self.assertEqual(result, list(cwr1(values, r))) # matches first pure python version 372 self.assertEqual(result, list(cwr2(values, r))) # matches second pure python version 373 374 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 375 self.pickletest(proto, cwr(values,r)) # test pickling 376 377 @support.bigaddrspacetest 378 def test_combinations_with_replacement_overflow(self): 379 with self.assertRaises((OverflowError, MemoryError)): 380 combinations_with_replacement("AA", 2**30) 381 382 # Test implementation detail: tuple re-use 383 @support.impl_detail("tuple reuse is specific to CPython") 384 def test_combinations_with_replacement_tuple_reuse(self): 385 cwr = combinations_with_replacement 386 self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1) 387 self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1) 388 389 def test_permutations(self): 390 self.assertRaises(TypeError, permutations) # too few arguments 391 self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments 392 self.assertRaises(TypeError, permutations, None) # pool is not iterable 393 self.assertRaises(ValueError, permutations, 'abc', -2) # r is negative 394 self.assertEqual(list(permutations('abc', 32)), []) # r > n 395 self.assertRaises(TypeError, permutations, 'abc', 's') # r is not an int or None 396 self.assertEqual(list(permutations(range(3), 2)), 397 [(0,1), (0,2), (1,0), (1,2), (2,0), (2,1)]) 398 399 def permutations1(iterable, r=None): 400 'Pure python version shown in the docs' 401 pool = tuple(iterable) 402 n = len(pool) 403 r = n if r is None else r 404 if r > n: 405 return 406 indices = list(range(n)) 407 cycles = list(range(n-r+1, n+1))[::-1] 408 yield tuple(pool[i] for i in indices[:r]) 409 while n: 410 for i in reversed(range(r)): 411 cycles[i] -= 1 412 if cycles[i] == 0: 413 indices[i:] = indices[i+1:] + indices[i:i+1] 414 cycles[i] = n - i 415 else: 416 j = cycles[i] 417 indices[i], indices[-j] = indices[-j], indices[i] 418 yield tuple(pool[i] for i in indices[:r]) 419 break 420 else: 421 return 422 423 def permutations2(iterable, r=None): 424 'Pure python version shown in the docs' 425 pool = tuple(iterable) 426 n = len(pool) 427 r = n if r is None else r 428 for indices in product(range(n), repeat=r): 429 if len(set(indices)) == r: 430 yield tuple(pool[i] for i in indices) 431 432 for n in range(7): 433 values = [5*x-12 for x in range(n)] 434 for r in range(n+2): 435 result = list(permutations(values, r)) 436 self.assertEqual(len(result), 0 if r>n else fact(n) / fact(n-r)) # right number of perms 437 self.assertEqual(len(result), len(set(result))) # no repeats 438 self.assertEqual(result, sorted(result)) # lexicographic order 439 for p in result: 440 self.assertEqual(len(p), r) # r-length permutations 441 self.assertEqual(len(set(p)), r) # no duplicate elements 442 self.assertTrue(all(e in values for e in p)) # elements taken from input iterable 443 self.assertEqual(result, list(permutations1(values, r))) # matches first pure python version 444 self.assertEqual(result, list(permutations2(values, r))) # matches second pure python version 445 if r == n: 446 self.assertEqual(result, list(permutations(values, None))) # test r as None 447 self.assertEqual(result, list(permutations(values))) # test default r 448 449 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 450 self.pickletest(proto, permutations(values, r)) # test pickling 451 452 @support.bigaddrspacetest 453 def test_permutations_overflow(self): 454 with self.assertRaises((OverflowError, MemoryError)): 455 permutations("A", 2**30) 456 457 @support.impl_detail("tuple reuse is specific to CPython") 458 def test_permutations_tuple_reuse(self): 459 self.assertEqual(len(set(map(id, permutations('abcde', 3)))), 1) 460 self.assertNotEqual(len(set(map(id, list(permutations('abcde', 3))))), 1) 461 462 def test_combinatorics(self): 463 # Test relationships between product(), permutations(), 464 # combinations() and combinations_with_replacement(). 465 466 for n in range(6): 467 s = 'ABCDEFG'[:n] 468 for r in range(8): 469 prod = list(product(s, repeat=r)) 470 cwr = list(combinations_with_replacement(s, r)) 471 perm = list(permutations(s, r)) 472 comb = list(combinations(s, r)) 473 474 # Check size 475 self.assertEqual(len(prod), n**r) 476 self.assertEqual(len(cwr), (fact(n+r-1) / fact(r)/ fact(n-1)) if n else (not r)) 477 self.assertEqual(len(perm), 0 if r>n else fact(n) / fact(n-r)) 478 self.assertEqual(len(comb), 0 if r>n else fact(n) / fact(r) / fact(n-r)) 479 480 # Check lexicographic order without repeated tuples 481 self.assertEqual(prod, sorted(set(prod))) 482 self.assertEqual(cwr, sorted(set(cwr))) 483 self.assertEqual(perm, sorted(set(perm))) 484 self.assertEqual(comb, sorted(set(comb))) 485 486 # Check interrelationships 487 self.assertEqual(cwr, [t for t in prod if sorted(t)==list(t)]) # cwr: prods which are sorted 488 self.assertEqual(perm, [t for t in prod if len(set(t))==r]) # perm: prods with no dups 489 self.assertEqual(comb, [t for t in perm if sorted(t)==list(t)]) # comb: perms that are sorted 490 self.assertEqual(comb, [t for t in cwr if len(set(t))==r]) # comb: cwrs without dups 491 self.assertEqual(comb, list(filter(set(cwr).__contains__, perm))) # comb: perm that is a cwr 492 self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm 493 self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm 494 495 def test_compress(self): 496 self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF')) 497 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 498 self.assertEqual(list(compress('ABCDEF', [0,0,0,0,0,0])), list('')) 499 self.assertEqual(list(compress('ABCDEF', [1,1,1,1,1,1])), list('ABCDEF')) 500 self.assertEqual(list(compress('ABCDEF', [1,0,1])), list('AC')) 501 self.assertEqual(list(compress('ABC', [0,1,1,1,1,1])), list('BC')) 502 n = 10000 503 data = chain.from_iterable(repeat(range(6), n)) 504 selectors = chain.from_iterable(repeat((0, 1))) 505 self.assertEqual(list(compress(data, selectors)), [1,3,5] * n) 506 self.assertRaises(TypeError, compress, None, range(6)) # 1st arg not iterable 507 self.assertRaises(TypeError, compress, range(6), None) # 2nd arg not iterable 508 self.assertRaises(TypeError, compress, range(6)) # too few args 509 self.assertRaises(TypeError, compress, range(6), None) # too many args 510 511 # check copy, deepcopy, pickle 512 for op in [lambda a:copy.copy(a), lambda a:copy.deepcopy(a)] + picklecopiers: 513 for data, selectors, result1, result2 in [ 514 ('ABCDEF', [1,0,1,0,1,1], 'ACEF', 'CEF'), 515 ('ABCDEF', [0,0,0,0,0,0], '', ''), 516 ('ABCDEF', [1,1,1,1,1,1], 'ABCDEF', 'BCDEF'), 517 ('ABCDEF', [1,0,1], 'AC', 'C'), 518 ('ABC', [0,1,1,1,1,1], 'BC', 'C'), 519 ]: 520 521 self.assertEqual(list(op(compress(data=data, selectors=selectors))), list(result1)) 522 self.assertEqual(list(op(compress(data, selectors))), list(result1)) 523 testIntermediate = compress(data, selectors) 524 if result1: 525 next(testIntermediate) 526 self.assertEqual(list(op(testIntermediate)), list(result2)) 527 528 529 def test_count(self): 530 self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)]) 531 self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)]) 532 self.assertEqual(take(2, lzip('abc',count(3))), [('a', 3), ('b', 4)]) 533 self.assertEqual(take(2, zip('abc',count(-1))), [('a', -1), ('b', 0)]) 534 self.assertEqual(take(2, zip('abc',count(-3))), [('a', -3), ('b', -2)]) 535 self.assertRaises(TypeError, count, 2, 3, 4) 536 self.assertRaises(TypeError, count, 'a') 537 self.assertEqual(take(10, count(maxsize-5)), 538 list(range(maxsize-5, maxsize+5))) 539 self.assertEqual(take(10, count(-maxsize-5)), 540 list(range(-maxsize-5, -maxsize+5))) 541 self.assertEqual(take(3, count(3.25)), [3.25, 4.25, 5.25]) 542 self.assertEqual(take(3, count(3.25-4j)), [3.25-4j, 4.25-4j, 5.25-4j]) 543 self.assertEqual(take(3, count(Decimal('1.1'))), 544 [Decimal('1.1'), Decimal('2.1'), Decimal('3.1')]) 545 self.assertEqual(take(3, count(Fraction(2, 3))), 546 [Fraction(2, 3), Fraction(5, 3), Fraction(8, 3)]) 547 BIGINT = 1<<1000 548 self.assertEqual(take(3, count(BIGINT)), [BIGINT, BIGINT+1, BIGINT+2]) 549 c = count(3) 550 self.assertEqual(repr(c), 'count(3)') 551 next(c) 552 self.assertEqual(repr(c), 'count(4)') 553 c = count(-9) 554 self.assertEqual(repr(c), 'count(-9)') 555 next(c) 556 self.assertEqual(next(c), -8) 557 self.assertEqual(repr(count(10.25)), 'count(10.25)') 558 self.assertEqual(repr(count(10.0)), 'count(10.0)') 559 self.assertEqual(type(next(count(10.0))), float) 560 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 561 # Test repr 562 r1 = repr(count(i)) 563 r2 = 'count(%r)'.__mod__(i) 564 self.assertEqual(r1, r2) 565 566 # check copy, deepcopy, pickle 567 for value in -3, 3, maxsize-5, maxsize+5: 568 c = count(value) 569 self.assertEqual(next(copy.copy(c)), value) 570 self.assertEqual(next(copy.deepcopy(c)), value) 571 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 572 self.pickletest(proto, count(value)) 573 574 #check proper internal error handling for large "step' sizes 575 count(1, maxsize+5); sys.exc_info() 576 577 def test_count_with_stride(self): 578 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 579 self.assertEqual(lzip('abc',count(start=2,step=3)), 580 [('a', 2), ('b', 5), ('c', 8)]) 581 self.assertEqual(lzip('abc',count(step=-1)), 582 [('a', 0), ('b', -1), ('c', -2)]) 583 self.assertRaises(TypeError, count, 'a', 'b') 584 self.assertEqual(lzip('abc',count(2,0)), [('a', 2), ('b', 2), ('c', 2)]) 585 self.assertEqual(lzip('abc',count(2,1)), [('a', 2), ('b', 3), ('c', 4)]) 586 self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)]) 587 self.assertEqual(take(20, count(maxsize-15, 3)), take(20, range(maxsize-15, maxsize+100, 3))) 588 self.assertEqual(take(20, count(-maxsize-15, 3)), take(20, range(-maxsize-15,-maxsize+100, 3))) 589 self.assertEqual(take(3, count(10, maxsize+5)), 590 list(range(10, 10+3*(maxsize+5), maxsize+5))) 591 self.assertEqual(take(3, count(2, 1.25)), [2, 3.25, 4.5]) 592 self.assertEqual(take(3, count(2, 3.25-4j)), [2, 5.25-4j, 8.5-8j]) 593 self.assertEqual(take(3, count(Decimal('1.1'), Decimal('.1'))), 594 [Decimal('1.1'), Decimal('1.2'), Decimal('1.3')]) 595 self.assertEqual(take(3, count(Fraction(2,3), Fraction(1,7))), 596 [Fraction(2,3), Fraction(17,21), Fraction(20,21)]) 597 BIGINT = 1<<1000 598 self.assertEqual(take(3, count(step=BIGINT)), [0, BIGINT, 2*BIGINT]) 599 self.assertEqual(repr(take(3, count(10, 2.5))), repr([10, 12.5, 15.0])) 600 c = count(3, 5) 601 self.assertEqual(repr(c), 'count(3, 5)') 602 next(c) 603 self.assertEqual(repr(c), 'count(8, 5)') 604 c = count(-9, 0) 605 self.assertEqual(repr(c), 'count(-9, 0)') 606 next(c) 607 self.assertEqual(repr(c), 'count(-9, 0)') 608 c = count(-9, -3) 609 self.assertEqual(repr(c), 'count(-9, -3)') 610 next(c) 611 self.assertEqual(repr(c), 'count(-12, -3)') 612 self.assertEqual(repr(c), 'count(-12, -3)') 613 self.assertEqual(repr(count(10.5, 1.25)), 'count(10.5, 1.25)') 614 self.assertEqual(repr(count(10.5, 1)), 'count(10.5)') # suppress step=1 when it's an int 615 self.assertEqual(repr(count(10.5, 1.00)), 'count(10.5, 1.0)') # do show float values lilke 1.0 616 self.assertEqual(repr(count(10, 1.00)), 'count(10, 1.0)') 617 c = count(10, 1.0) 618 self.assertEqual(type(next(c)), int) 619 self.assertEqual(type(next(c)), float) 620 for i in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 10, sys.maxsize-5, sys.maxsize+5): 621 for j in (-sys.maxsize-5, -sys.maxsize+5 ,-10, -1, 0, 1, 10, sys.maxsize-5, sys.maxsize+5): 622 # Test repr 623 r1 = repr(count(i, j)) 624 if j == 1: 625 r2 = ('count(%r)' % i) 626 else: 627 r2 = ('count(%r, %r)' % (i, j)) 628 self.assertEqual(r1, r2) 629 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 630 self.pickletest(proto, count(i, j)) 631 632 def test_cycle(self): 633 self.assertEqual(take(10, cycle('abc')), list('abcabcabca')) 634 self.assertEqual(list(cycle('')), []) 635 self.assertRaises(TypeError, cycle) 636 self.assertRaises(TypeError, cycle, 5) 637 self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0]) 638 639 def test_cycle_copy_pickle(self): 640 # check copy, deepcopy, pickle 641 c = cycle('abc') 642 self.assertEqual(next(c), 'a') 643 #simple copy currently not supported, because __reduce__ returns 644 #an internal iterator 645 #self.assertEqual(take(10, copy.copy(c)), list('bcabcabcab')) 646 self.assertEqual(take(10, copy.deepcopy(c)), list('bcabcabcab')) 647 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 648 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 649 list('bcabcabcab')) 650 next(c) 651 self.assertEqual(take(10, pickle.loads(pickle.dumps(c, proto))), 652 list('cabcabcabc')) 653 next(c) 654 next(c) 655 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 656 self.pickletest(proto, cycle('abc')) 657 658 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 659 # test with partial consumed input iterable 660 it = iter('abcde') 661 c = cycle(it) 662 _ = [next(c) for i in range(2)] # consume 2 of 5 inputs 663 p = pickle.dumps(c, proto) 664 d = pickle.loads(p) # rebuild the cycle object 665 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 666 667 # test with completely consumed input iterable 668 it = iter('abcde') 669 c = cycle(it) 670 _ = [next(c) for i in range(7)] # consume 7 of 5 inputs 671 p = pickle.dumps(c, proto) 672 d = pickle.loads(p) # rebuild the cycle object 673 self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab')) 674 675 def test_cycle_unpickle_compat(self): 676 testcases = [ 677 b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI0\ntb.', 678 b'citertools\ncycle\n(c__builtin__\niter\n(](K\x01K\x02K\x03etRK\x01btR(]K\x01aK\x00tb.', 679 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.', 680 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.', 681 b'\x80\x04\x95=\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01aK\x00\x86b.', 682 683 b'citertools\ncycle\n(c__builtin__\niter\n((lp0\nI1\naI2\naI3\natRI1\nbtR(g0\nI1\ntb.', 684 b'citertools\ncycle\n(c__builtin__\niter\n(]q\x00(K\x01K\x02K\x03etRK\x01btR(h\x00K\x01tb.', 685 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.', 686 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.', 687 b'\x80\x04\x95<\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93]\x94(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00K\x01\x86b.', 688 689 b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI00\ntb.', 690 b'citertools\ncycle\n(c__builtin__\niter\n(](K\x01K\x02K\x03etRK\x01btR(]K\x01aI00\ntb.', 691 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.', 692 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.', 693 b'\x80\x04\x95<\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93](K\x01K\x02K\x03e\x85RK\x01b\x85R]K\x01a\x89\x86b.', 694 695 b'citertools\ncycle\n(c__builtin__\niter\n((lp0\nI1\naI2\naI3\natRI1\nbtR(g0\nI01\ntb.', 696 b'citertools\ncycle\n(c__builtin__\niter\n(]q\x00(K\x01K\x02K\x03etRK\x01btR(h\x00I01\ntb.', 697 b'\x80\x02citertools\ncycle\nc__builtin__\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.', 698 b'\x80\x03citertools\ncycle\ncbuiltins\niter\n]q\x00(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.', 699 b'\x80\x04\x95;\x00\x00\x00\x00\x00\x00\x00\x8c\titertools\x8c\x05cycle\x93\x8c\x08builtins\x8c\x04iter\x93]\x94(K\x01K\x02K\x03e\x85RK\x01b\x85Rh\x00\x88\x86b.', 700 ] 701 assert len(testcases) == 20 702 for t in testcases: 703 it = pickle.loads(t) 704 self.assertEqual(take(10, it), [2, 3, 1, 2, 3, 1, 2, 3, 1, 2]) 705 706 def test_cycle_setstate(self): 707 # Verify both modes for restoring state 708 709 # Mode 0 is efficient. It uses an incompletely consumed input 710 # iterator to build a cycle object and then passes in state with 711 # a list of previously consumed values. There is no data 712 # overlap between the two. 713 c = cycle('defg') 714 c.__setstate__((list('abc'), 0)) 715 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 716 717 # Mode 1 is inefficient. It starts with a cycle object built 718 # from an iterator over the remaining elements in a partial 719 # cycle and then passes in state with all of the previously 720 # seen values (this overlaps values included in the iterator). 721 c = cycle('defg') 722 c.__setstate__((list('abcdefg'), 1)) 723 self.assertEqual(take(20, c), list('defgabcdefgabcdefgab')) 724 725 # The first argument to setstate needs to be a tuple 726 with self.assertRaises(TypeError): 727 cycle('defg').__setstate__([list('abcdefg'), 0]) 728 729 # The first argument in the setstate tuple must be a list 730 with self.assertRaises(TypeError): 731 c = cycle('defg') 732 c.__setstate__((tuple('defg'), 0)) 733 take(20, c) 734 735 # The second argument in the setstate tuple must be an int 736 with self.assertRaises(TypeError): 737 cycle('defg').__setstate__((list('abcdefg'), 'x')) 738 739 self.assertRaises(TypeError, cycle('').__setstate__, ()) 740 self.assertRaises(TypeError, cycle('').__setstate__, ([],)) 741 742 def test_groupby(self): 743 # Check whether it accepts arguments correctly 744 self.assertEqual([], list(groupby([]))) 745 self.assertEqual([], list(groupby([], key=id))) 746 self.assertRaises(TypeError, list, groupby('abc', [])) 747 self.assertRaises(TypeError, groupby, None) 748 self.assertRaises(TypeError, groupby, 'abc', lambda x:x, 10) 749 750 # Check normal input 751 s = [(0, 10, 20), (0, 11,21), (0,12,21), (1,13,21), (1,14,22), 752 (2,15,22), (3,16,23), (3,17,23)] 753 dup = [] 754 for k, g in groupby(s, lambda r:r[0]): 755 for elem in g: 756 self.assertEqual(k, elem[0]) 757 dup.append(elem) 758 self.assertEqual(s, dup) 759 760 # Check normal pickled 761 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 762 dup = [] 763 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 764 for elem in g: 765 self.assertEqual(k, elem[0]) 766 dup.append(elem) 767 self.assertEqual(s, dup) 768 769 # Check nested case 770 dup = [] 771 for k, g in groupby(s, testR): 772 for ik, ig in groupby(g, testR2): 773 for elem in ig: 774 self.assertEqual(k, elem[0]) 775 self.assertEqual(ik, elem[2]) 776 dup.append(elem) 777 self.assertEqual(s, dup) 778 779 # Check nested and pickled 780 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 781 dup = [] 782 for k, g in pickle.loads(pickle.dumps(groupby(s, testR), proto)): 783 for ik, ig in pickle.loads(pickle.dumps(groupby(g, testR2), proto)): 784 for elem in ig: 785 self.assertEqual(k, elem[0]) 786 self.assertEqual(ik, elem[2]) 787 dup.append(elem) 788 self.assertEqual(s, dup) 789 790 791 # Check case where inner iterator is not used 792 keys = [k for k, g in groupby(s, testR)] 793 expectedkeys = set([r[0] for r in s]) 794 self.assertEqual(set(keys), expectedkeys) 795 self.assertEqual(len(keys), len(expectedkeys)) 796 797 # Check case where inner iterator is used after advancing the groupby 798 # iterator 799 s = list(zip('AABBBAAAA', range(9))) 800 it = groupby(s, testR) 801 _, g1 = next(it) 802 _, g2 = next(it) 803 _, g3 = next(it) 804 self.assertEqual(list(g1), []) 805 self.assertEqual(list(g2), []) 806 self.assertEqual(next(g3), ('A', 5)) 807 list(it) # exhaust the groupby iterator 808 self.assertEqual(list(g3), []) 809 810 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 811 it = groupby(s, testR) 812 _, g = next(it) 813 next(it) 814 next(it) 815 self.assertEqual(list(pickle.loads(pickle.dumps(g, proto))), []) 816 817 # Exercise pipes and filters style 818 s = 'abracadabra' 819 # sort s | uniq 820 r = [k for k, g in groupby(sorted(s))] 821 self.assertEqual(r, ['a', 'b', 'c', 'd', 'r']) 822 # sort s | uniq -d 823 r = [k for k, g in groupby(sorted(s)) if list(islice(g,1,2))] 824 self.assertEqual(r, ['a', 'b', 'r']) 825 # sort s | uniq -c 826 r = [(len(list(g)), k) for k, g in groupby(sorted(s))] 827 self.assertEqual(r, [(5, 'a'), (2, 'b'), (1, 'c'), (1, 'd'), (2, 'r')]) 828 # sort s | uniq -c | sort -rn | head -3 829 r = sorted([(len(list(g)) , k) for k, g in groupby(sorted(s))], reverse=True)[:3] 830 self.assertEqual(r, [(5, 'a'), (2, 'r'), (2, 'b')]) 831 832 # iter.__next__ failure 833 class ExpectedError(Exception): 834 pass 835 def delayed_raise(n=0): 836 for i in range(n): 837 yield 'yo' 838 raise ExpectedError 839 def gulp(iterable, keyp=None, func=list): 840 return [func(g) for k, g in groupby(iterable, keyp)] 841 842 # iter.__next__ failure on outer object 843 self.assertRaises(ExpectedError, gulp, delayed_raise(0)) 844 # iter.__next__ failure on inner object 845 self.assertRaises(ExpectedError, gulp, delayed_raise(1)) 846 847 # __eq__ failure 848 class DummyCmp: 849 def __eq__(self, dst): 850 raise ExpectedError 851 s = [DummyCmp(), DummyCmp(), None] 852 853 # __eq__ failure on outer object 854 self.assertRaises(ExpectedError, gulp, s, func=id) 855 # __eq__ failure on inner object 856 self.assertRaises(ExpectedError, gulp, s) 857 858 # keyfunc failure 859 def keyfunc(obj): 860 if keyfunc.skip > 0: 861 keyfunc.skip -= 1 862 return obj 863 else: 864 raise ExpectedError 865 866 # keyfunc failure on outer object 867 keyfunc.skip = 0 868 self.assertRaises(ExpectedError, gulp, [None], keyfunc) 869 keyfunc.skip = 1 870 self.assertRaises(ExpectedError, gulp, [None, None], keyfunc) 871 872 def test_filter(self): 873 self.assertEqual(list(filter(isEven, range(6))), [0,2,4]) 874 self.assertEqual(list(filter(None, [0,1,0,2,0])), [1,2]) 875 self.assertEqual(list(filter(bool, [0,1,0,2,0])), [1,2]) 876 self.assertEqual(take(4, filter(isEven, count())), [0,2,4,6]) 877 self.assertRaises(TypeError, filter) 878 self.assertRaises(TypeError, filter, lambda x:x) 879 self.assertRaises(TypeError, filter, lambda x:x, range(6), 7) 880 self.assertRaises(TypeError, filter, isEven, 3) 881 self.assertRaises(TypeError, next, filter(range(6), range(6))) 882 883 # check copy, deepcopy, pickle 884 ans = [0,2,4] 885 886 c = filter(isEven, range(6)) 887 self.assertEqual(list(copy.copy(c)), ans) 888 c = filter(isEven, range(6)) 889 self.assertEqual(list(copy.deepcopy(c)), ans) 890 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 891 c = filter(isEven, range(6)) 892 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans) 893 next(c) 894 self.assertEqual(list(pickle.loads(pickle.dumps(c, proto))), ans[1:]) 895 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 896 c = filter(isEven, range(6)) 897 self.pickletest(proto, c) 898 899 def test_filterfalse(self): 900 self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5]) 901 self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0]) 902 self.assertEqual(list(filterfalse(bool, [0,1,0,2,0])), [0,0,0]) 903 self.assertEqual(take(4, filterfalse(isEven, count())), [1,3,5,7]) 904 self.assertRaises(TypeError, filterfalse) 905 self.assertRaises(TypeError, filterfalse, lambda x:x) 906 self.assertRaises(TypeError, filterfalse, lambda x:x, range(6), 7) 907 self.assertRaises(TypeError, filterfalse, isEven, 3) 908 self.assertRaises(TypeError, next, filterfalse(range(6), range(6))) 909 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 910 self.pickletest(proto, filterfalse(isEven, range(6))) 911 912 def test_zip(self): 913 # XXX This is rather silly now that builtin zip() calls zip()... 914 ans = [(x,y) for x, y in zip('abc',count())] 915 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 916 self.assertEqual(list(zip('abc', range(6))), lzip('abc', range(6))) 917 self.assertEqual(list(zip('abcdef', range(3))), lzip('abcdef', range(3))) 918 self.assertEqual(take(3,zip('abcdef', count())), lzip('abcdef', range(3))) 919 self.assertEqual(list(zip('abcdef')), lzip('abcdef')) 920 self.assertEqual(list(zip()), lzip()) 921 self.assertRaises(TypeError, zip, 3) 922 self.assertRaises(TypeError, zip, range(3), 3) 923 self.assertEqual([tuple(list(pair)) for pair in zip('abc', 'def')], 924 lzip('abc', 'def')) 925 self.assertEqual([pair for pair in zip('abc', 'def')], 926 lzip('abc', 'def')) 927 928 @support.impl_detail("tuple reuse is specific to CPython") 929 def test_zip_tuple_reuse(self): 930 ids = list(map(id, zip('abc', 'def'))) 931 self.assertEqual(min(ids), max(ids)) 932 ids = list(map(id, list(zip('abc', 'def')))) 933 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 934 935 # check copy, deepcopy, pickle 936 ans = [(x,y) for x, y in copy.copy(zip('abc',count()))] 937 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 938 939 ans = [(x,y) for x, y in copy.deepcopy(zip('abc',count()))] 940 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 941 942 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 943 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(zip('abc',count()), proto))] 944 self.assertEqual(ans, [('a', 0), ('b', 1), ('c', 2)]) 945 946 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 947 testIntermediate = zip('abc',count()) 948 next(testIntermediate) 949 ans = [(x,y) for x, y in pickle.loads(pickle.dumps(testIntermediate, proto))] 950 self.assertEqual(ans, [('b', 1), ('c', 2)]) 951 952 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 953 self.pickletest(proto, zip('abc', count())) 954 955 def test_ziplongest(self): 956 for args in [ 957 ['abc', range(6)], 958 [range(6), 'abc'], 959 [range(1000), range(2000,2100), range(3000,3050)], 960 [range(1000), range(0), range(3000,3050), range(1200), range(1500)], 961 [range(1000), range(0), range(3000,3050), range(1200), range(1500), range(0)], 962 ]: 963 target = [tuple([arg[i] if i < len(arg) else None for arg in args]) 964 for i in range(max(map(len, args)))] 965 self.assertEqual(list(zip_longest(*args)), target) 966 self.assertEqual(list(zip_longest(*args, **{})), target) 967 target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X' 968 self.assertEqual(list(zip_longest(*args, **dict(fillvalue='X'))), target) 969 970 self.assertEqual(take(3,zip_longest('abcdef', count())), list(zip('abcdef', range(3)))) # take 3 from infinite input 971 972 self.assertEqual(list(zip_longest()), list(zip())) 973 self.assertEqual(list(zip_longest([])), list(zip([]))) 974 self.assertEqual(list(zip_longest('abcdef')), list(zip('abcdef'))) 975 976 self.assertEqual(list(zip_longest('abc', 'defg', **{})), 977 list(zip(list('abc')+[None], 'defg'))) # empty keyword dict 978 self.assertRaises(TypeError, zip_longest, 3) 979 self.assertRaises(TypeError, zip_longest, range(3), 3) 980 981 for stmt in [ 982 "zip_longest('abc', fv=1)", 983 "zip_longest('abc', fillvalue=1, bogus_keyword=None)", 984 ]: 985 try: 986 eval(stmt, globals(), locals()) 987 except TypeError: 988 pass 989 else: 990 self.fail('Did not raise Type in: ' + stmt) 991 992 self.assertEqual([tuple(list(pair)) for pair in zip_longest('abc', 'def')], 993 list(zip('abc', 'def'))) 994 self.assertEqual([pair for pair in zip_longest('abc', 'def')], 995 list(zip('abc', 'def'))) 996 997 @support.impl_detail("tuple reuse is specific to CPython") 998 def test_zip_longest_tuple_reuse(self): 999 ids = list(map(id, zip_longest('abc', 'def'))) 1000 self.assertEqual(min(ids), max(ids)) 1001 ids = list(map(id, list(zip_longest('abc', 'def')))) 1002 self.assertEqual(len(dict.fromkeys(ids)), len(ids)) 1003 1004 def test_zip_longest_pickling(self): 1005 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1006 self.pickletest(proto, zip_longest("abc", "def")) 1007 self.pickletest(proto, zip_longest("abc", "defgh")) 1008 self.pickletest(proto, zip_longest("abc", "defgh", fillvalue=1)) 1009 self.pickletest(proto, zip_longest("", "defgh")) 1010 1011 def test_zip_longest_bad_iterable(self): 1012 exception = TypeError() 1013 1014 class BadIterable: 1015 def __iter__(self): 1016 raise exception 1017 1018 with self.assertRaises(TypeError) as cm: 1019 zip_longest(BadIterable()) 1020 1021 self.assertIs(cm.exception, exception) 1022 1023 def test_bug_7244(self): 1024 1025 class Repeater: 1026 # this class is similar to itertools.repeat 1027 def __init__(self, o, t, e): 1028 self.o = o 1029 self.t = int(t) 1030 self.e = e 1031 def __iter__(self): # its iterator is itself 1032 return self 1033 def __next__(self): 1034 if self.t > 0: 1035 self.t -= 1 1036 return self.o 1037 else: 1038 raise self.e 1039 1040 # Formerly this code in would fail in debug mode 1041 # with Undetected Error and Stop Iteration 1042 r1 = Repeater(1, 3, StopIteration) 1043 r2 = Repeater(2, 4, StopIteration) 1044 def run(r1, r2): 1045 result = [] 1046 for i, j in zip_longest(r1, r2, fillvalue=0): 1047 with support.captured_output('stdout'): 1048 print((i, j)) 1049 result.append((i, j)) 1050 return result 1051 self.assertEqual(run(r1, r2), [(1,2), (1,2), (1,2), (0,2)]) 1052 1053 # Formerly, the RuntimeError would be lost 1054 # and StopIteration would stop as expected 1055 r1 = Repeater(1, 3, RuntimeError) 1056 r2 = Repeater(2, 4, StopIteration) 1057 it = zip_longest(r1, r2, fillvalue=0) 1058 self.assertEqual(next(it), (1, 2)) 1059 self.assertEqual(next(it), (1, 2)) 1060 self.assertEqual(next(it), (1, 2)) 1061 self.assertRaises(RuntimeError, next, it) 1062 1063 def test_pairwise(self): 1064 self.assertEqual(list(pairwise('')), []) 1065 self.assertEqual(list(pairwise('a')), []) 1066 self.assertEqual(list(pairwise('ab')), 1067 [('a', 'b')]), 1068 self.assertEqual(list(pairwise('abcde')), 1069 [('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e')]) 1070 self.assertEqual(list(pairwise(range(10_000))), 1071 list(zip(range(10_000), range(1, 10_000)))) 1072 1073 with self.assertRaises(TypeError): 1074 pairwise() # too few arguments 1075 with self.assertRaises(TypeError): 1076 pairwise('abc', 10) # too many arguments 1077 with self.assertRaises(TypeError): 1078 pairwise(iterable='abc') # keyword arguments 1079 with self.assertRaises(TypeError): 1080 pairwise(None) # non-iterable argument 1081 1082 def test_product(self): 1083 for args, result in [ 1084 ([], [()]), # zero iterables 1085 (['ab'], [('a',), ('b',)]), # one iterable 1086 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1087 ([range(0), range(2), range(3)], []), # first iterable with zero length 1088 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1089 ([range(2), range(3), range(0)], []), # last iterable with zero length 1090 ]: 1091 self.assertEqual(list(product(*args)), result) 1092 for r in range(4): 1093 self.assertEqual(list(product(*(args*r))), 1094 list(product(*args, **dict(repeat=r)))) 1095 self.assertEqual(len(list(product(*[range(7)]*6))), 7**6) 1096 self.assertRaises(TypeError, product, range(6), None) 1097 1098 def product1(*args, **kwds): 1099 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1100 n = len(pools) 1101 if n == 0: 1102 yield () 1103 return 1104 if any(len(pool) == 0 for pool in pools): 1105 return 1106 indices = [0] * n 1107 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1108 while 1: 1109 for i in reversed(range(n)): # right to left 1110 if indices[i] == len(pools[i]) - 1: 1111 continue 1112 indices[i] += 1 1113 for j in range(i+1, n): 1114 indices[j] = 0 1115 yield tuple(pool[i] for pool, i in zip(pools, indices)) 1116 break 1117 else: 1118 return 1119 1120 def product2(*args, **kwds): 1121 'Pure python version used in docs' 1122 pools = list(map(tuple, args)) * kwds.get('repeat', 1) 1123 result = [[]] 1124 for pool in pools: 1125 result = [x+[y] for x in result for y in pool] 1126 for prod in result: 1127 yield tuple(prod) 1128 1129 argtypes = ['', 'abc', '', range(0), range(4), dict(a=1, b=2, c=3), 1130 set('abcdefg'), range(11), tuple(range(13))] 1131 for i in range(100): 1132 args = [random.choice(argtypes) for j in range(random.randrange(5))] 1133 expected_len = prod(map(len, args)) 1134 self.assertEqual(len(list(product(*args))), expected_len) 1135 self.assertEqual(list(product(*args)), list(product1(*args))) 1136 self.assertEqual(list(product(*args)), list(product2(*args))) 1137 args = map(iter, args) 1138 self.assertEqual(len(list(product(*args))), expected_len) 1139 1140 @support.bigaddrspacetest 1141 def test_product_overflow(self): 1142 with self.assertRaises((OverflowError, MemoryError)): 1143 product(*(['ab']*2**5), repeat=2**25) 1144 1145 @support.impl_detail("tuple reuse is specific to CPython") 1146 def test_product_tuple_reuse(self): 1147 self.assertEqual(len(set(map(id, product('abc', 'def')))), 1) 1148 self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1) 1149 1150 def test_product_pickling(self): 1151 # check copy, deepcopy, pickle 1152 for args, result in [ 1153 ([], [()]), # zero iterables 1154 (['ab'], [('a',), ('b',)]), # one iterable 1155 ([range(2), range(3)], [(0,0), (0,1), (0,2), (1,0), (1,1), (1,2)]), # two iterables 1156 ([range(0), range(2), range(3)], []), # first iterable with zero length 1157 ([range(2), range(0), range(3)], []), # middle iterable with zero length 1158 ([range(2), range(3), range(0)], []), # last iterable with zero length 1159 ]: 1160 self.assertEqual(list(copy.copy(product(*args))), result) 1161 self.assertEqual(list(copy.deepcopy(product(*args))), result) 1162 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1163 self.pickletest(proto, product(*args)) 1164 1165 def test_product_issue_25021(self): 1166 # test that indices are properly clamped to the length of the tuples 1167 p = product((1, 2),(3,)) 1168 p.__setstate__((0, 0x1000)) # will access tuple element 1 if not clamped 1169 self.assertEqual(next(p), (2, 3)) 1170 # test that empty tuple in the list will result in an immediate StopIteration 1171 p = product((1, 2), (), (3,)) 1172 p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped 1173 self.assertRaises(StopIteration, next, p) 1174 1175 def test_repeat(self): 1176 self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a']) 1177 self.assertEqual(lzip(range(3),repeat('a')), 1178 [(0, 'a'), (1, 'a'), (2, 'a')]) 1179 self.assertEqual(list(repeat('a', 3)), ['a', 'a', 'a']) 1180 self.assertEqual(take(3, repeat('a')), ['a', 'a', 'a']) 1181 self.assertEqual(list(repeat('a', 0)), []) 1182 self.assertEqual(list(repeat('a', -3)), []) 1183 self.assertRaises(TypeError, repeat) 1184 self.assertRaises(TypeError, repeat, None, 3, 4) 1185 self.assertRaises(TypeError, repeat, None, 'a') 1186 r = repeat(1+0j) 1187 self.assertEqual(repr(r), 'repeat((1+0j))') 1188 r = repeat(1+0j, 5) 1189 self.assertEqual(repr(r), 'repeat((1+0j), 5)') 1190 list(r) 1191 self.assertEqual(repr(r), 'repeat((1+0j), 0)') 1192 1193 # check copy, deepcopy, pickle 1194 c = repeat(object='a', times=10) 1195 self.assertEqual(next(c), 'a') 1196 self.assertEqual(take(2, copy.copy(c)), list('a' * 2)) 1197 self.assertEqual(take(2, copy.deepcopy(c)), list('a' * 2)) 1198 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1199 self.pickletest(proto, repeat(object='a', times=10)) 1200 1201 def test_repeat_with_negative_times(self): 1202 self.assertEqual(repr(repeat('a', -1)), "repeat('a', 0)") 1203 self.assertEqual(repr(repeat('a', -2)), "repeat('a', 0)") 1204 self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)") 1205 self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)") 1206 1207 def test_map(self): 1208 self.assertEqual(list(map(operator.pow, range(3), range(1,7))), 1209 [0**1, 1**2, 2**3]) 1210 self.assertEqual(list(map(tupleize, 'abc', range(5))), 1211 [('a',0),('b',1),('c',2)]) 1212 self.assertEqual(list(map(tupleize, 'abc', count())), 1213 [('a',0),('b',1),('c',2)]) 1214 self.assertEqual(take(2,map(tupleize, 'abc', count())), 1215 [('a',0),('b',1)]) 1216 self.assertEqual(list(map(operator.pow, [])), []) 1217 self.assertRaises(TypeError, map) 1218 self.assertRaises(TypeError, list, map(None, range(3), range(3))) 1219 self.assertRaises(TypeError, map, operator.neg) 1220 self.assertRaises(TypeError, next, map(10, range(5))) 1221 self.assertRaises(ValueError, next, map(errfunc, [4], [5])) 1222 self.assertRaises(TypeError, next, map(onearg, [4], [5])) 1223 1224 # check copy, deepcopy, pickle 1225 ans = [('a',0),('b',1),('c',2)] 1226 1227 c = map(tupleize, 'abc', count()) 1228 self.assertEqual(list(copy.copy(c)), ans) 1229 1230 c = map(tupleize, 'abc', count()) 1231 self.assertEqual(list(copy.deepcopy(c)), ans) 1232 1233 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1234 c = map(tupleize, 'abc', count()) 1235 self.pickletest(proto, c) 1236 1237 def test_starmap(self): 1238 self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))), 1239 [0**1, 1**2, 2**3]) 1240 self.assertEqual(take(3, starmap(operator.pow, zip(count(), count(1)))), 1241 [0**1, 1**2, 2**3]) 1242 self.assertEqual(list(starmap(operator.pow, [])), []) 1243 self.assertEqual(list(starmap(operator.pow, [iter([4,5])])), [4**5]) 1244 self.assertRaises(TypeError, list, starmap(operator.pow, [None])) 1245 self.assertRaises(TypeError, starmap) 1246 self.assertRaises(TypeError, starmap, operator.pow, [(4,5)], 'extra') 1247 self.assertRaises(TypeError, next, starmap(10, [(4,5)])) 1248 self.assertRaises(ValueError, next, starmap(errfunc, [(4,5)])) 1249 self.assertRaises(TypeError, next, starmap(onearg, [(4,5)])) 1250 1251 # check copy, deepcopy, pickle 1252 ans = [0**1, 1**2, 2**3] 1253 1254 c = starmap(operator.pow, zip(range(3), range(1,7))) 1255 self.assertEqual(list(copy.copy(c)), ans) 1256 1257 c = starmap(operator.pow, zip(range(3), range(1,7))) 1258 self.assertEqual(list(copy.deepcopy(c)), ans) 1259 1260 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1261 c = starmap(operator.pow, zip(range(3), range(1,7))) 1262 self.pickletest(proto, c) 1263 1264 def test_islice(self): 1265 for args in [ # islice(args) should agree with range(args) 1266 (10, 20, 3), 1267 (10, 3, 20), 1268 (10, 20), 1269 (10, 10), 1270 (10, 3), 1271 (20,) 1272 ]: 1273 self.assertEqual(list(islice(range(100), *args)), 1274 list(range(*args))) 1275 1276 for args, tgtargs in [ # Stop when seqn is exhausted 1277 ((10, 110, 3), ((10, 100, 3))), 1278 ((10, 110), ((10, 100))), 1279 ((110,), (100,)) 1280 ]: 1281 self.assertEqual(list(islice(range(100), *args)), 1282 list(range(*tgtargs))) 1283 1284 # Test stop=None 1285 self.assertEqual(list(islice(range(10), None)), list(range(10))) 1286 self.assertEqual(list(islice(range(10), None, None)), list(range(10))) 1287 self.assertEqual(list(islice(range(10), None, None, None)), list(range(10))) 1288 self.assertEqual(list(islice(range(10), 2, None)), list(range(2, 10))) 1289 self.assertEqual(list(islice(range(10), 1, None, 2)), list(range(1, 10, 2))) 1290 1291 # Test number of items consumed SF #1171417 1292 it = iter(range(10)) 1293 self.assertEqual(list(islice(it, 3)), list(range(3))) 1294 self.assertEqual(list(it), list(range(3, 10))) 1295 1296 it = iter(range(10)) 1297 self.assertEqual(list(islice(it, 3, 3)), []) 1298 self.assertEqual(list(it), list(range(3, 10))) 1299 1300 # Test invalid arguments 1301 ra = range(10) 1302 self.assertRaises(TypeError, islice, ra) 1303 self.assertRaises(TypeError, islice, ra, 1, 2, 3, 4) 1304 self.assertRaises(ValueError, islice, ra, -5, 10, 1) 1305 self.assertRaises(ValueError, islice, ra, 1, -5, -1) 1306 self.assertRaises(ValueError, islice, ra, 1, 10, -1) 1307 self.assertRaises(ValueError, islice, ra, 1, 10, 0) 1308 self.assertRaises(ValueError, islice, ra, 'a') 1309 self.assertRaises(ValueError, islice, ra, 'a', 1) 1310 self.assertRaises(ValueError, islice, ra, 1, 'a') 1311 self.assertRaises(ValueError, islice, ra, 'a', 1, 1) 1312 self.assertRaises(ValueError, islice, ra, 1, 'a', 1) 1313 self.assertEqual(len(list(islice(count(), 1, 10, maxsize))), 1) 1314 1315 # Issue #10323: Less islice in a predictable state 1316 c = count() 1317 self.assertEqual(list(islice(c, 1, 3, 50)), [1]) 1318 self.assertEqual(next(c), 3) 1319 1320 # check copy, deepcopy, pickle 1321 for args in [ # islice(args) should agree with range(args) 1322 (10, 20, 3), 1323 (10, 3, 20), 1324 (10, 20), 1325 (10, 3), 1326 (20,) 1327 ]: 1328 self.assertEqual(list(copy.copy(islice(range(100), *args))), 1329 list(range(*args))) 1330 self.assertEqual(list(copy.deepcopy(islice(range(100), *args))), 1331 list(range(*args))) 1332 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1333 self.pickletest(proto, islice(range(100), *args)) 1334 1335 # Issue #21321: check source iterator is not referenced 1336 # from islice() after the latter has been exhausted 1337 it = (x for x in (1, 2)) 1338 wr = weakref.ref(it) 1339 it = islice(it, 1) 1340 self.assertIsNotNone(wr()) 1341 list(it) # exhaust the iterator 1342 support.gc_collect() 1343 self.assertIsNone(wr()) 1344 1345 # Issue #30537: islice can accept integer-like objects as 1346 # arguments 1347 class IntLike(object): 1348 def __init__(self, val): 1349 self.val = val 1350 def __index__(self): 1351 return self.val 1352 self.assertEqual(list(islice(range(100), IntLike(10))), list(range(10))) 1353 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50))), 1354 list(range(10, 50))) 1355 self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50), IntLike(5))), 1356 list(range(10,50,5))) 1357 1358 def test_takewhile(self): 1359 data = [1, 3, 5, 20, 2, 4, 6, 8] 1360 self.assertEqual(list(takewhile(underten, data)), [1, 3, 5]) 1361 self.assertEqual(list(takewhile(underten, [])), []) 1362 self.assertRaises(TypeError, takewhile) 1363 self.assertRaises(TypeError, takewhile, operator.pow) 1364 self.assertRaises(TypeError, takewhile, operator.pow, [(4,5)], 'extra') 1365 self.assertRaises(TypeError, next, takewhile(10, [(4,5)])) 1366 self.assertRaises(ValueError, next, takewhile(errfunc, [(4,5)])) 1367 t = takewhile(bool, [1, 1, 1, 0, 0, 0]) 1368 self.assertEqual(list(t), [1, 1, 1]) 1369 self.assertRaises(StopIteration, next, t) 1370 1371 # check copy, deepcopy, pickle 1372 self.assertEqual(list(copy.copy(takewhile(underten, data))), [1, 3, 5]) 1373 self.assertEqual(list(copy.deepcopy(takewhile(underten, data))), 1374 [1, 3, 5]) 1375 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1376 self.pickletest(proto, takewhile(underten, data)) 1377 1378 def test_dropwhile(self): 1379 data = [1, 3, 5, 20, 2, 4, 6, 8] 1380 self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8]) 1381 self.assertEqual(list(dropwhile(underten, [])), []) 1382 self.assertRaises(TypeError, dropwhile) 1383 self.assertRaises(TypeError, dropwhile, operator.pow) 1384 self.assertRaises(TypeError, dropwhile, operator.pow, [(4,5)], 'extra') 1385 self.assertRaises(TypeError, next, dropwhile(10, [(4,5)])) 1386 self.assertRaises(ValueError, next, dropwhile(errfunc, [(4,5)])) 1387 1388 # check copy, deepcopy, pickle 1389 self.assertEqual(list(copy.copy(dropwhile(underten, data))), [20, 2, 4, 6, 8]) 1390 self.assertEqual(list(copy.deepcopy(dropwhile(underten, data))), 1391 [20, 2, 4, 6, 8]) 1392 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1393 self.pickletest(proto, dropwhile(underten, data)) 1394 1395 def test_tee(self): 1396 n = 200 1397 1398 a, b = tee([]) # test empty iterator 1399 self.assertEqual(list(a), []) 1400 self.assertEqual(list(b), []) 1401 1402 a, b = tee(irange(n)) # test 100% interleaved 1403 self.assertEqual(lzip(a,b), lzip(range(n), range(n))) 1404 1405 a, b = tee(irange(n)) # test 0% interleaved 1406 self.assertEqual(list(a), list(range(n))) 1407 self.assertEqual(list(b), list(range(n))) 1408 1409 a, b = tee(irange(n)) # test dealloc of leading iterator 1410 for i in range(100): 1411 self.assertEqual(next(a), i) 1412 del a 1413 self.assertEqual(list(b), list(range(n))) 1414 1415 a, b = tee(irange(n)) # test dealloc of trailing iterator 1416 for i in range(100): 1417 self.assertEqual(next(a), i) 1418 del b 1419 self.assertEqual(list(a), list(range(100, n))) 1420 1421 for j in range(5): # test randomly interleaved 1422 order = [0]*n + [1]*n 1423 random.shuffle(order) 1424 lists = ([], []) 1425 its = tee(irange(n)) 1426 for i in order: 1427 value = next(its[i]) 1428 lists[i].append(value) 1429 self.assertEqual(lists[0], list(range(n))) 1430 self.assertEqual(lists[1], list(range(n))) 1431 1432 # test argument format checking 1433 self.assertRaises(TypeError, tee) 1434 self.assertRaises(TypeError, tee, 3) 1435 self.assertRaises(TypeError, tee, [1,2], 'x') 1436 self.assertRaises(TypeError, tee, [1,2], 3, 'x') 1437 1438 # tee object should be instantiable 1439 a, b = tee('abc') 1440 c = type(a)('def') 1441 self.assertEqual(list(c), list('def')) 1442 1443 # test long-lagged and multi-way split 1444 a, b, c = tee(range(2000), 3) 1445 for i in range(100): 1446 self.assertEqual(next(a), i) 1447 self.assertEqual(list(b), list(range(2000))) 1448 self.assertEqual([next(c), next(c)], list(range(2))) 1449 self.assertEqual(list(a), list(range(100,2000))) 1450 self.assertEqual(list(c), list(range(2,2000))) 1451 1452 # test values of n 1453 self.assertRaises(TypeError, tee, 'abc', 'invalid') 1454 self.assertRaises(ValueError, tee, [], -1) 1455 for n in range(5): 1456 result = tee('abc', n) 1457 self.assertEqual(type(result), tuple) 1458 self.assertEqual(len(result), n) 1459 self.assertEqual([list(x) for x in result], [list('abc')]*n) 1460 1461 # tee pass-through to copyable iterator 1462 a, b = tee('abc') 1463 c, d = tee(a) 1464 self.assertTrue(a is c) 1465 1466 # test tee_new 1467 t1, t2 = tee('abc') 1468 tnew = type(t1) 1469 self.assertRaises(TypeError, tnew) 1470 self.assertRaises(TypeError, tnew, 10) 1471 t3 = tnew(t1) 1472 self.assertTrue(list(t1) == list(t2) == list(t3) == list('abc')) 1473 1474 # test that tee objects are weak referencable 1475 a, b = tee(range(10)) 1476 p = weakref.proxy(a) 1477 self.assertEqual(getattr(p, '__class__'), type(b)) 1478 del a 1479 support.gc_collect() # For PyPy or other GCs. 1480 self.assertRaises(ReferenceError, getattr, p, '__class__') 1481 1482 ans = list('abc') 1483 long_ans = list(range(10000)) 1484 1485 # check copy 1486 a, b = tee('abc') 1487 self.assertEqual(list(copy.copy(a)), ans) 1488 self.assertEqual(list(copy.copy(b)), ans) 1489 a, b = tee(list(range(10000))) 1490 self.assertEqual(list(copy.copy(a)), long_ans) 1491 self.assertEqual(list(copy.copy(b)), long_ans) 1492 1493 # check partially consumed copy 1494 a, b = tee('abc') 1495 take(2, a) 1496 take(1, b) 1497 self.assertEqual(list(copy.copy(a)), ans[2:]) 1498 self.assertEqual(list(copy.copy(b)), ans[1:]) 1499 self.assertEqual(list(a), ans[2:]) 1500 self.assertEqual(list(b), ans[1:]) 1501 a, b = tee(range(10000)) 1502 take(100, a) 1503 take(60, b) 1504 self.assertEqual(list(copy.copy(a)), long_ans[100:]) 1505 self.assertEqual(list(copy.copy(b)), long_ans[60:]) 1506 self.assertEqual(list(a), long_ans[100:]) 1507 self.assertEqual(list(b), long_ans[60:]) 1508 1509 # check deepcopy 1510 a, b = tee('abc') 1511 self.assertEqual(list(copy.deepcopy(a)), ans) 1512 self.assertEqual(list(copy.deepcopy(b)), ans) 1513 self.assertEqual(list(a), ans) 1514 self.assertEqual(list(b), ans) 1515 a, b = tee(range(10000)) 1516 self.assertEqual(list(copy.deepcopy(a)), long_ans) 1517 self.assertEqual(list(copy.deepcopy(b)), long_ans) 1518 self.assertEqual(list(a), long_ans) 1519 self.assertEqual(list(b), long_ans) 1520 1521 # check partially consumed deepcopy 1522 a, b = tee('abc') 1523 take(2, a) 1524 take(1, b) 1525 self.assertEqual(list(copy.deepcopy(a)), ans[2:]) 1526 self.assertEqual(list(copy.deepcopy(b)), ans[1:]) 1527 self.assertEqual(list(a), ans[2:]) 1528 self.assertEqual(list(b), ans[1:]) 1529 a, b = tee(range(10000)) 1530 take(100, a) 1531 take(60, b) 1532 self.assertEqual(list(copy.deepcopy(a)), long_ans[100:]) 1533 self.assertEqual(list(copy.deepcopy(b)), long_ans[60:]) 1534 self.assertEqual(list(a), long_ans[100:]) 1535 self.assertEqual(list(b), long_ans[60:]) 1536 1537 # check pickle 1538 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1539 self.pickletest(proto, iter(tee('abc'))) 1540 a, b = tee('abc') 1541 self.pickletest(proto, a, compare=ans) 1542 self.pickletest(proto, b, compare=ans) 1543 1544 # Issue 13454: Crash when deleting backward iterator from tee() 1545 def test_tee_del_backward(self): 1546 forward, backward = tee(repeat(None, 20000000)) 1547 try: 1548 any(forward) # exhaust the iterator 1549 del backward 1550 except: 1551 del forward, backward 1552 raise 1553 1554 def test_tee_reenter(self): 1555 class I: 1556 first = True 1557 def __iter__(self): 1558 return self 1559 def __next__(self): 1560 first = self.first 1561 self.first = False 1562 if first: 1563 return next(b) 1564 1565 a, b = tee(I()) 1566 with self.assertRaisesRegex(RuntimeError, "tee"): 1567 next(a) 1568 1569 @threading_helper.requires_working_threading() 1570 def test_tee_concurrent(self): 1571 start = threading.Event() 1572 finish = threading.Event() 1573 class I: 1574 def __iter__(self): 1575 return self 1576 def __next__(self): 1577 start.set() 1578 finish.wait() 1579 1580 a, b = tee(I()) 1581 thread = threading.Thread(target=next, args=[a]) 1582 thread.start() 1583 try: 1584 start.wait() 1585 with self.assertRaisesRegex(RuntimeError, "tee"): 1586 next(b) 1587 finally: 1588 finish.set() 1589 thread.join() 1590 1591 def test_StopIteration(self): 1592 self.assertRaises(StopIteration, next, zip()) 1593 1594 for f in (chain, cycle, zip, groupby): 1595 self.assertRaises(StopIteration, next, f([])) 1596 self.assertRaises(StopIteration, next, f(StopNow())) 1597 1598 self.assertRaises(StopIteration, next, islice([], None)) 1599 self.assertRaises(StopIteration, next, islice(StopNow(), None)) 1600 1601 p, q = tee([]) 1602 self.assertRaises(StopIteration, next, p) 1603 self.assertRaises(StopIteration, next, q) 1604 p, q = tee(StopNow()) 1605 self.assertRaises(StopIteration, next, p) 1606 self.assertRaises(StopIteration, next, q) 1607 1608 self.assertRaises(StopIteration, next, repeat(None, 0)) 1609 1610 for f in (filter, filterfalse, map, takewhile, dropwhile, starmap): 1611 self.assertRaises(StopIteration, next, f(lambda x:x, [])) 1612 self.assertRaises(StopIteration, next, f(lambda x:x, StopNow())) 1613 1614 @support.cpython_only 1615 def test_combinations_result_gc(self): 1616 # bpo-42536: combinations's tuple-reuse speed trick breaks the GC's 1617 # assumptions about what can be untracked. Make sure we re-track result 1618 # tuples whenever we reuse them. 1619 it = combinations([None, []], 1) 1620 next(it) 1621 gc.collect() 1622 # That GC collection probably untracked the recycled internal result 1623 # tuple, which has the value (None,). Make sure it's re-tracked when 1624 # it's mutated and returned from __next__: 1625 self.assertTrue(gc.is_tracked(next(it))) 1626 1627 @support.cpython_only 1628 def test_combinations_with_replacement_result_gc(self): 1629 # Ditto for combinations_with_replacement. 1630 it = combinations_with_replacement([None, []], 1) 1631 next(it) 1632 gc.collect() 1633 self.assertTrue(gc.is_tracked(next(it))) 1634 1635 @support.cpython_only 1636 def test_permutations_result_gc(self): 1637 # Ditto for permutations. 1638 it = permutations([None, []], 1) 1639 next(it) 1640 gc.collect() 1641 self.assertTrue(gc.is_tracked(next(it))) 1642 1643 @support.cpython_only 1644 def test_product_result_gc(self): 1645 # Ditto for product. 1646 it = product([None, []]) 1647 next(it) 1648 gc.collect() 1649 self.assertTrue(gc.is_tracked(next(it))) 1650 1651 @support.cpython_only 1652 def test_zip_longest_result_gc(self): 1653 # Ditto for zip_longest. 1654 it = zip_longest([[]]) 1655 gc.collect() 1656 self.assertTrue(gc.is_tracked(next(it))) 1657 1658 1659class TestExamples(unittest.TestCase): 1660 1661 def test_accumulate(self): 1662 self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15]) 1663 1664 def test_accumulate_reducible(self): 1665 # check copy, deepcopy, pickle 1666 data = [1, 2, 3, 4, 5] 1667 accumulated = [1, 3, 6, 10, 15] 1668 1669 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1670 it = accumulate(data) 1671 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[:]) 1672 self.assertEqual(next(it), 1) 1673 self.assertEqual(list(pickle.loads(pickle.dumps(it, proto))), accumulated[1:]) 1674 it = accumulate(data) 1675 self.assertEqual(next(it), 1) 1676 self.assertEqual(list(copy.deepcopy(it)), accumulated[1:]) 1677 self.assertEqual(list(copy.copy(it)), accumulated[1:]) 1678 1679 def test_accumulate_reducible_none(self): 1680 # Issue #25718: total is None 1681 it = accumulate([None, None, None], operator.is_) 1682 self.assertEqual(next(it), None) 1683 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1684 it_copy = pickle.loads(pickle.dumps(it, proto)) 1685 self.assertEqual(list(it_copy), [True, False]) 1686 self.assertEqual(list(copy.deepcopy(it)), [True, False]) 1687 self.assertEqual(list(copy.copy(it)), [True, False]) 1688 1689 def test_chain(self): 1690 self.assertEqual(''.join(chain('ABC', 'DEF')), 'ABCDEF') 1691 1692 def test_chain_from_iterable(self): 1693 self.assertEqual(''.join(chain.from_iterable(['ABC', 'DEF'])), 'ABCDEF') 1694 1695 def test_combinations(self): 1696 self.assertEqual(list(combinations('ABCD', 2)), 1697 [('A','B'), ('A','C'), ('A','D'), ('B','C'), ('B','D'), ('C','D')]) 1698 self.assertEqual(list(combinations(range(4), 3)), 1699 [(0,1,2), (0,1,3), (0,2,3), (1,2,3)]) 1700 1701 def test_combinations_with_replacement(self): 1702 self.assertEqual(list(combinations_with_replacement('ABC', 2)), 1703 [('A','A'), ('A','B'), ('A','C'), ('B','B'), ('B','C'), ('C','C')]) 1704 1705 def test_compress(self): 1706 self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF')) 1707 1708 def test_count(self): 1709 self.assertEqual(list(islice(count(10), 5)), [10, 11, 12, 13, 14]) 1710 1711 def test_cycle(self): 1712 self.assertEqual(list(islice(cycle('ABCD'), 12)), list('ABCDABCDABCD')) 1713 1714 def test_dropwhile(self): 1715 self.assertEqual(list(dropwhile(lambda x: x<5, [1,4,6,4,1])), [6,4,1]) 1716 1717 def test_groupby(self): 1718 self.assertEqual([k for k, g in groupby('AAAABBBCCDAABBB')], 1719 list('ABCDAB')) 1720 self.assertEqual([(list(g)) for k, g in groupby('AAAABBBCCD')], 1721 [list('AAAA'), list('BBB'), list('CC'), list('D')]) 1722 1723 def test_filter(self): 1724 self.assertEqual(list(filter(lambda x: x%2, range(10))), [1,3,5,7,9]) 1725 1726 def test_filterfalse(self): 1727 self.assertEqual(list(filterfalse(lambda x: x%2, range(10))), [0,2,4,6,8]) 1728 1729 def test_map(self): 1730 self.assertEqual(list(map(pow, (2,3,10), (5,2,3))), [32, 9, 1000]) 1731 1732 def test_islice(self): 1733 self.assertEqual(list(islice('ABCDEFG', 2)), list('AB')) 1734 self.assertEqual(list(islice('ABCDEFG', 2, 4)), list('CD')) 1735 self.assertEqual(list(islice('ABCDEFG', 2, None)), list('CDEFG')) 1736 self.assertEqual(list(islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1737 1738 def test_zip(self): 1739 self.assertEqual(list(zip('ABCD', 'xy')), [('A', 'x'), ('B', 'y')]) 1740 1741 def test_zip_longest(self): 1742 self.assertEqual(list(zip_longest('ABCD', 'xy', fillvalue='-')), 1743 [('A', 'x'), ('B', 'y'), ('C', '-'), ('D', '-')]) 1744 1745 def test_permutations(self): 1746 self.assertEqual(list(permutations('ABCD', 2)), 1747 list(map(tuple, 'AB AC AD BA BC BD CA CB CD DA DB DC'.split()))) 1748 self.assertEqual(list(permutations(range(3))), 1749 [(0,1,2), (0,2,1), (1,0,2), (1,2,0), (2,0,1), (2,1,0)]) 1750 1751 def test_product(self): 1752 self.assertEqual(list(product('ABCD', 'xy')), 1753 list(map(tuple, 'Ax Ay Bx By Cx Cy Dx Dy'.split()))) 1754 self.assertEqual(list(product(range(2), repeat=3)), 1755 [(0,0,0), (0,0,1), (0,1,0), (0,1,1), 1756 (1,0,0), (1,0,1), (1,1,0), (1,1,1)]) 1757 1758 def test_repeat(self): 1759 self.assertEqual(list(repeat(10, 3)), [10, 10, 10]) 1760 1761 def test_stapmap(self): 1762 self.assertEqual(list(starmap(pow, [(2,5), (3,2), (10,3)])), 1763 [32, 9, 1000]) 1764 1765 def test_takewhile(self): 1766 self.assertEqual(list(takewhile(lambda x: x<5, [1,4,6,4,1])), [1,4]) 1767 1768 1769class TestPurePythonRoughEquivalents(unittest.TestCase): 1770 1771 @staticmethod 1772 def islice(iterable, *args): 1773 s = slice(*args) 1774 start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1 1775 it = iter(range(start, stop, step)) 1776 try: 1777 nexti = next(it) 1778 except StopIteration: 1779 # Consume *iterable* up to the *start* position. 1780 for i, element in zip(range(start), iterable): 1781 pass 1782 return 1783 try: 1784 for i, element in enumerate(iterable): 1785 if i == nexti: 1786 yield element 1787 nexti = next(it) 1788 except StopIteration: 1789 # Consume to *stop*. 1790 for i, element in zip(range(i + 1, stop), iterable): 1791 pass 1792 1793 def test_islice_recipe(self): 1794 self.assertEqual(list(self.islice('ABCDEFG', 2)), list('AB')) 1795 self.assertEqual(list(self.islice('ABCDEFG', 2, 4)), list('CD')) 1796 self.assertEqual(list(self.islice('ABCDEFG', 2, None)), list('CDEFG')) 1797 self.assertEqual(list(self.islice('ABCDEFG', 0, None, 2)), list('ACEG')) 1798 # Test items consumed. 1799 it = iter(range(10)) 1800 self.assertEqual(list(self.islice(it, 3)), list(range(3))) 1801 self.assertEqual(list(it), list(range(3, 10))) 1802 it = iter(range(10)) 1803 self.assertEqual(list(self.islice(it, 3, 3)), []) 1804 self.assertEqual(list(it), list(range(3, 10))) 1805 # Test that slice finishes in predictable state. 1806 c = count() 1807 self.assertEqual(list(self.islice(c, 1, 3, 50)), [1]) 1808 self.assertEqual(next(c), 3) 1809 1810 1811class TestGC(unittest.TestCase): 1812 1813 def makecycle(self, iterator, container): 1814 container.append(iterator) 1815 next(iterator) 1816 del container, iterator 1817 1818 def test_accumulate(self): 1819 a = [] 1820 self.makecycle(accumulate([1,2,a,3]), a) 1821 1822 def test_chain(self): 1823 a = [] 1824 self.makecycle(chain(a), a) 1825 1826 def test_chain_from_iterable(self): 1827 a = [] 1828 self.makecycle(chain.from_iterable([a]), a) 1829 1830 def test_combinations(self): 1831 a = [] 1832 self.makecycle(combinations([1,2,a,3], 3), a) 1833 1834 def test_combinations_with_replacement(self): 1835 a = [] 1836 self.makecycle(combinations_with_replacement([1,2,a,3], 3), a) 1837 1838 def test_compress(self): 1839 a = [] 1840 self.makecycle(compress('ABCDEF', [1,0,1,0,1,0]), a) 1841 1842 def test_count(self): 1843 a = [] 1844 Int = type('Int', (int,), dict(x=a)) 1845 self.makecycle(count(Int(0), Int(1)), a) 1846 1847 def test_cycle(self): 1848 a = [] 1849 self.makecycle(cycle([a]*2), a) 1850 1851 def test_dropwhile(self): 1852 a = [] 1853 self.makecycle(dropwhile(bool, [0, a, a]), a) 1854 1855 def test_groupby(self): 1856 a = [] 1857 self.makecycle(groupby([a]*2, lambda x:x), a) 1858 1859 def test_issue2246(self): 1860 # Issue 2246 -- the _grouper iterator was not included in GC 1861 n = 10 1862 keyfunc = lambda x: x 1863 for i, j in groupby(range(n), key=keyfunc): 1864 keyfunc.__dict__.setdefault('x',[]).append(j) 1865 1866 def test_filter(self): 1867 a = [] 1868 self.makecycle(filter(lambda x:True, [a]*2), a) 1869 1870 def test_filterfalse(self): 1871 a = [] 1872 self.makecycle(filterfalse(lambda x:False, a), a) 1873 1874 def test_zip(self): 1875 a = [] 1876 self.makecycle(zip([a]*2, [a]*3), a) 1877 1878 def test_zip_longest(self): 1879 a = [] 1880 self.makecycle(zip_longest([a]*2, [a]*3), a) 1881 b = [a, None] 1882 self.makecycle(zip_longest([a]*2, [a]*3, fillvalue=b), a) 1883 1884 def test_map(self): 1885 a = [] 1886 self.makecycle(map(lambda x:x, [a]*2), a) 1887 1888 def test_islice(self): 1889 a = [] 1890 self.makecycle(islice([a]*2, None), a) 1891 1892 def test_pairwise(self): 1893 a = [] 1894 self.makecycle(pairwise([a]*5), a) 1895 1896 def test_permutations(self): 1897 a = [] 1898 self.makecycle(permutations([1,2,a,3], 3), a) 1899 1900 def test_product(self): 1901 a = [] 1902 self.makecycle(product([1,2,a,3], repeat=3), a) 1903 1904 def test_repeat(self): 1905 a = [] 1906 self.makecycle(repeat(a), a) 1907 1908 def test_starmap(self): 1909 a = [] 1910 self.makecycle(starmap(lambda *t: t, [(a,a)]*2), a) 1911 1912 def test_takewhile(self): 1913 a = [] 1914 self.makecycle(takewhile(bool, [1, 0, a, a]), a) 1915 1916def R(seqn): 1917 'Regular generator' 1918 for i in seqn: 1919 yield i 1920 1921class G: 1922 'Sequence using __getitem__' 1923 def __init__(self, seqn): 1924 self.seqn = seqn 1925 def __getitem__(self, i): 1926 return self.seqn[i] 1927 1928class I: 1929 'Sequence using iterator protocol' 1930 def __init__(self, seqn): 1931 self.seqn = seqn 1932 self.i = 0 1933 def __iter__(self): 1934 return self 1935 def __next__(self): 1936 if self.i >= len(self.seqn): raise StopIteration 1937 v = self.seqn[self.i] 1938 self.i += 1 1939 return v 1940 1941class Ig: 1942 'Sequence using iterator protocol defined with a generator' 1943 def __init__(self, seqn): 1944 self.seqn = seqn 1945 self.i = 0 1946 def __iter__(self): 1947 for val in self.seqn: 1948 yield val 1949 1950class X: 1951 'Missing __getitem__ and __iter__' 1952 def __init__(self, seqn): 1953 self.seqn = seqn 1954 self.i = 0 1955 def __next__(self): 1956 if self.i >= len(self.seqn): raise StopIteration 1957 v = self.seqn[self.i] 1958 self.i += 1 1959 return v 1960 1961class N: 1962 'Iterator missing __next__()' 1963 def __init__(self, seqn): 1964 self.seqn = seqn 1965 self.i = 0 1966 def __iter__(self): 1967 return self 1968 1969class E: 1970 'Test propagation of exceptions' 1971 def __init__(self, seqn): 1972 self.seqn = seqn 1973 self.i = 0 1974 def __iter__(self): 1975 return self 1976 def __next__(self): 1977 3 // 0 1978 1979class S: 1980 'Test immediate stop' 1981 def __init__(self, seqn): 1982 pass 1983 def __iter__(self): 1984 return self 1985 def __next__(self): 1986 raise StopIteration 1987 1988def L(seqn): 1989 'Test multiple tiers of iterators' 1990 return chain(map(lambda x:x, R(Ig(G(seqn))))) 1991 1992 1993class TestVariousIteratorArgs(unittest.TestCase): 1994 1995 def test_accumulate(self): 1996 s = [1,2,3,4,5] 1997 r = [1,3,6,10,15] 1998 n = len(s) 1999 for g in (G, I, Ig, L, R): 2000 self.assertEqual(list(accumulate(g(s))), r) 2001 self.assertEqual(list(accumulate(S(s))), []) 2002 self.assertRaises(TypeError, accumulate, X(s)) 2003 self.assertRaises(TypeError, accumulate, N(s)) 2004 self.assertRaises(ZeroDivisionError, list, accumulate(E(s))) 2005 2006 def test_chain(self): 2007 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2008 for g in (G, I, Ig, S, L, R): 2009 self.assertEqual(list(chain(g(s))), list(g(s))) 2010 self.assertEqual(list(chain(g(s), g(s))), list(g(s))+list(g(s))) 2011 self.assertRaises(TypeError, list, chain(X(s))) 2012 self.assertRaises(TypeError, list, chain(N(s))) 2013 self.assertRaises(ZeroDivisionError, list, chain(E(s))) 2014 2015 def test_compress(self): 2016 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2017 n = len(s) 2018 for g in (G, I, Ig, S, L, R): 2019 self.assertEqual(list(compress(g(s), repeat(1))), list(g(s))) 2020 self.assertRaises(TypeError, compress, X(s), repeat(1)) 2021 self.assertRaises(TypeError, compress, N(s), repeat(1)) 2022 self.assertRaises(ZeroDivisionError, list, compress(E(s), repeat(1))) 2023 2024 def test_product(self): 2025 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2026 self.assertRaises(TypeError, product, X(s)) 2027 self.assertRaises(TypeError, product, N(s)) 2028 self.assertRaises(ZeroDivisionError, product, E(s)) 2029 2030 def test_cycle(self): 2031 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2032 for g in (G, I, Ig, S, L, R): 2033 tgtlen = len(s) * 3 2034 expected = list(g(s))*3 2035 actual = list(islice(cycle(g(s)), tgtlen)) 2036 self.assertEqual(actual, expected) 2037 self.assertRaises(TypeError, cycle, X(s)) 2038 self.assertRaises(TypeError, cycle, N(s)) 2039 self.assertRaises(ZeroDivisionError, list, cycle(E(s))) 2040 2041 def test_groupby(self): 2042 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2043 for g in (G, I, Ig, S, L, R): 2044 self.assertEqual([k for k, sb in groupby(g(s))], list(g(s))) 2045 self.assertRaises(TypeError, groupby, X(s)) 2046 self.assertRaises(TypeError, groupby, N(s)) 2047 self.assertRaises(ZeroDivisionError, list, groupby(E(s))) 2048 2049 def test_filter(self): 2050 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2051 for g in (G, I, Ig, S, L, R): 2052 self.assertEqual(list(filter(isEven, g(s))), 2053 [x for x in g(s) if isEven(x)]) 2054 self.assertRaises(TypeError, filter, isEven, X(s)) 2055 self.assertRaises(TypeError, filter, isEven, N(s)) 2056 self.assertRaises(ZeroDivisionError, list, filter(isEven, E(s))) 2057 2058 def test_filterfalse(self): 2059 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2060 for g in (G, I, Ig, S, L, R): 2061 self.assertEqual(list(filterfalse(isEven, g(s))), 2062 [x for x in g(s) if isOdd(x)]) 2063 self.assertRaises(TypeError, filterfalse, isEven, X(s)) 2064 self.assertRaises(TypeError, filterfalse, isEven, N(s)) 2065 self.assertRaises(ZeroDivisionError, list, filterfalse(isEven, E(s))) 2066 2067 def test_zip(self): 2068 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2069 for g in (G, I, Ig, S, L, R): 2070 self.assertEqual(list(zip(g(s))), lzip(g(s))) 2071 self.assertEqual(list(zip(g(s), g(s))), lzip(g(s), g(s))) 2072 self.assertRaises(TypeError, zip, X(s)) 2073 self.assertRaises(TypeError, zip, N(s)) 2074 self.assertRaises(ZeroDivisionError, list, zip(E(s))) 2075 2076 def test_ziplongest(self): 2077 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2078 for g in (G, I, Ig, S, L, R): 2079 self.assertEqual(list(zip_longest(g(s))), list(zip(g(s)))) 2080 self.assertEqual(list(zip_longest(g(s), g(s))), list(zip(g(s), g(s)))) 2081 self.assertRaises(TypeError, zip_longest, X(s)) 2082 self.assertRaises(TypeError, zip_longest, N(s)) 2083 self.assertRaises(ZeroDivisionError, list, zip_longest(E(s))) 2084 2085 def test_map(self): 2086 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2087 for g in (G, I, Ig, S, L, R): 2088 self.assertEqual(list(map(onearg, g(s))), 2089 [onearg(x) for x in g(s)]) 2090 self.assertEqual(list(map(operator.pow, g(s), g(s))), 2091 [x**x for x in g(s)]) 2092 self.assertRaises(TypeError, map, onearg, X(s)) 2093 self.assertRaises(TypeError, map, onearg, N(s)) 2094 self.assertRaises(ZeroDivisionError, list, map(onearg, E(s))) 2095 2096 def test_islice(self): 2097 for s in ("12345", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2098 for g in (G, I, Ig, S, L, R): 2099 self.assertEqual(list(islice(g(s),1,None,2)), list(g(s))[1::2]) 2100 self.assertRaises(TypeError, islice, X(s), 10) 2101 self.assertRaises(TypeError, islice, N(s), 10) 2102 self.assertRaises(ZeroDivisionError, list, islice(E(s), 10)) 2103 2104 def test_pairwise(self): 2105 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2106 for g in (G, I, Ig, S, L, R): 2107 seq = list(g(s)) 2108 expected = list(zip(seq, seq[1:])) 2109 actual = list(pairwise(g(s))) 2110 self.assertEqual(actual, expected) 2111 self.assertRaises(TypeError, pairwise, X(s)) 2112 self.assertRaises(TypeError, pairwise, N(s)) 2113 self.assertRaises(ZeroDivisionError, list, pairwise(E(s))) 2114 2115 def test_starmap(self): 2116 for s in (range(10), range(0), range(100), (7,11), range(20,50,5)): 2117 for g in (G, I, Ig, S, L, R): 2118 ss = lzip(s, s) 2119 self.assertEqual(list(starmap(operator.pow, g(ss))), 2120 [x**x for x in g(s)]) 2121 self.assertRaises(TypeError, starmap, operator.pow, X(ss)) 2122 self.assertRaises(TypeError, starmap, operator.pow, N(ss)) 2123 self.assertRaises(ZeroDivisionError, list, starmap(operator.pow, E(ss))) 2124 2125 def test_takewhile(self): 2126 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2127 for g in (G, I, Ig, S, L, R): 2128 tgt = [] 2129 for elem in g(s): 2130 if not isEven(elem): break 2131 tgt.append(elem) 2132 self.assertEqual(list(takewhile(isEven, g(s))), tgt) 2133 self.assertRaises(TypeError, takewhile, isEven, X(s)) 2134 self.assertRaises(TypeError, takewhile, isEven, N(s)) 2135 self.assertRaises(ZeroDivisionError, list, takewhile(isEven, E(s))) 2136 2137 def test_dropwhile(self): 2138 for s in (range(10), range(0), range(1000), (7,11), range(2000,2200,5)): 2139 for g in (G, I, Ig, S, L, R): 2140 tgt = [] 2141 for elem in g(s): 2142 if not tgt and isOdd(elem): continue 2143 tgt.append(elem) 2144 self.assertEqual(list(dropwhile(isOdd, g(s))), tgt) 2145 self.assertRaises(TypeError, dropwhile, isOdd, X(s)) 2146 self.assertRaises(TypeError, dropwhile, isOdd, N(s)) 2147 self.assertRaises(ZeroDivisionError, list, dropwhile(isOdd, E(s))) 2148 2149 def test_tee(self): 2150 for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): 2151 for g in (G, I, Ig, S, L, R): 2152 it1, it2 = tee(g(s)) 2153 self.assertEqual(list(it1), list(g(s))) 2154 self.assertEqual(list(it2), list(g(s))) 2155 self.assertRaises(TypeError, tee, X(s)) 2156 self.assertRaises(TypeError, tee, N(s)) 2157 self.assertRaises(ZeroDivisionError, list, tee(E(s))[0]) 2158 2159class LengthTransparency(unittest.TestCase): 2160 2161 def test_repeat(self): 2162 self.assertEqual(operator.length_hint(repeat(None, 50)), 50) 2163 self.assertEqual(operator.length_hint(repeat(None, 0)), 0) 2164 self.assertEqual(operator.length_hint(repeat(None), 12), 12) 2165 2166 def test_repeat_with_negative_times(self): 2167 self.assertEqual(operator.length_hint(repeat(None, -1)), 0) 2168 self.assertEqual(operator.length_hint(repeat(None, -2)), 0) 2169 self.assertEqual(operator.length_hint(repeat(None, times=-1)), 0) 2170 self.assertEqual(operator.length_hint(repeat(None, times=-2)), 0) 2171 2172class RegressionTests(unittest.TestCase): 2173 2174 def test_sf_793826(self): 2175 # Fix Armin Rigo's successful efforts to wreak havoc 2176 2177 def mutatingtuple(tuple1, f, tuple2): 2178 # this builds a tuple t which is a copy of tuple1, 2179 # then calls f(t), then mutates t to be equal to tuple2 2180 # (needs len(tuple1) == len(tuple2)). 2181 def g(value, first=[1]): 2182 if first: 2183 del first[:] 2184 f(next(z)) 2185 return value 2186 items = list(tuple2) 2187 items[1:1] = list(tuple1) 2188 gen = map(g, items) 2189 z = zip(*[gen]*len(tuple1)) 2190 next(z) 2191 2192 def f(t): 2193 global T 2194 T = t 2195 first[:] = list(T) 2196 2197 first = [] 2198 mutatingtuple((1,2,3), f, (4,5,6)) 2199 second = list(T) 2200 self.assertEqual(first, second) 2201 2202 2203 def test_sf_950057(self): 2204 # Make sure that chain() and cycle() catch exceptions immediately 2205 # rather than when shifting between input sources 2206 2207 def gen1(): 2208 hist.append(0) 2209 yield 1 2210 hist.append(1) 2211 raise AssertionError 2212 hist.append(2) 2213 2214 def gen2(x): 2215 hist.append(3) 2216 yield 2 2217 hist.append(4) 2218 2219 hist = [] 2220 self.assertRaises(AssertionError, list, chain(gen1(), gen2(False))) 2221 self.assertEqual(hist, [0,1]) 2222 2223 hist = [] 2224 self.assertRaises(AssertionError, list, chain(gen1(), gen2(True))) 2225 self.assertEqual(hist, [0,1]) 2226 2227 hist = [] 2228 self.assertRaises(AssertionError, list, cycle(gen1())) 2229 self.assertEqual(hist, [0,1]) 2230 2231 @support.skip_if_pgo_task 2232 def test_long_chain_of_empty_iterables(self): 2233 # Make sure itertools.chain doesn't run into recursion limits when 2234 # dealing with long chains of empty iterables. Even with a high 2235 # number this would probably only fail in Py_DEBUG mode. 2236 it = chain.from_iterable(() for unused in range(10000000)) 2237 with self.assertRaises(StopIteration): 2238 next(it) 2239 2240 def test_issue30347_1(self): 2241 def f(n): 2242 if n == 5: 2243 list(b) 2244 return n != 6 2245 for (k, b) in groupby(range(10), f): 2246 list(b) # shouldn't crash 2247 2248 def test_issue30347_2(self): 2249 class K: 2250 def __init__(self, v): 2251 pass 2252 def __eq__(self, other): 2253 nonlocal i 2254 i += 1 2255 if i == 1: 2256 next(g, None) 2257 return True 2258 i = 0 2259 g = next(groupby(range(10), K))[1] 2260 for j in range(2): 2261 next(g, None) # shouldn't crash 2262 2263 2264class SubclassWithKwargsTest(unittest.TestCase): 2265 def test_keywords_in_subclass(self): 2266 # count is not subclassable... 2267 testcases = [ 2268 (repeat, (1, 2), [1, 1]), 2269 (zip, ([1, 2], 'ab'), [(1, 'a'), (2, 'b')]), 2270 (filter, (None, [0, 1]), [1]), 2271 (filterfalse, (None, [0, 1]), [0]), 2272 (chain, ([1, 2], [3, 4]), [1, 2, 3]), 2273 (map, (str, [1, 2]), ['1', '2']), 2274 (starmap, (operator.pow, ((2, 3), (3, 2))), [8, 9]), 2275 (islice, ([1, 2, 3, 4], 1, 3), [2, 3]), 2276 (takewhile, (isEven, [2, 3, 4]), [2]), 2277 (dropwhile, (isEven, [2, 3, 4]), [3, 4]), 2278 (cycle, ([1, 2],), [1, 2, 1]), 2279 (compress, ('ABC', [1, 0, 1]), ['A', 'C']), 2280 ] 2281 for cls, args, result in testcases: 2282 with self.subTest(cls): 2283 class subclass(cls): 2284 pass 2285 u = subclass(*args) 2286 self.assertIs(type(u), subclass) 2287 self.assertEqual(list(islice(u, 0, 3)), result) 2288 with self.assertRaises(TypeError): 2289 subclass(*args, newarg=3) 2290 2291 for cls, args, result in testcases: 2292 # Constructors of repeat, zip, compress accept keyword arguments. 2293 # Their subclasses need overriding __new__ to support new 2294 # keyword arguments. 2295 if cls in [repeat, zip, compress]: 2296 continue 2297 with self.subTest(cls): 2298 class subclass_with_init(cls): 2299 def __init__(self, *args, newarg=None): 2300 self.newarg = newarg 2301 u = subclass_with_init(*args, newarg=3) 2302 self.assertIs(type(u), subclass_with_init) 2303 self.assertEqual(list(islice(u, 0, 3)), result) 2304 self.assertEqual(u.newarg, 3) 2305 2306 for cls, args, result in testcases: 2307 with self.subTest(cls): 2308 class subclass_with_new(cls): 2309 def __new__(cls, *args, newarg=None): 2310 self = super().__new__(cls, *args) 2311 self.newarg = newarg 2312 return self 2313 u = subclass_with_new(*args, newarg=3) 2314 self.assertIs(type(u), subclass_with_new) 2315 self.assertEqual(list(islice(u, 0, 3)), result) 2316 self.assertEqual(u.newarg, 3) 2317 2318 2319@support.cpython_only 2320class SizeofTest(unittest.TestCase): 2321 def setUp(self): 2322 self.ssize_t = struct.calcsize('n') 2323 2324 check_sizeof = support.check_sizeof 2325 2326 def test_product_sizeof(self): 2327 basesize = support.calcobjsize('3Pi') 2328 check = self.check_sizeof 2329 check(product('ab', '12'), basesize + 2 * self.ssize_t) 2330 check(product(*(('abc',) * 10)), basesize + 10 * self.ssize_t) 2331 2332 def test_combinations_sizeof(self): 2333 basesize = support.calcobjsize('3Pni') 2334 check = self.check_sizeof 2335 check(combinations('abcd', 3), basesize + 3 * self.ssize_t) 2336 check(combinations(range(10), 4), basesize + 4 * self.ssize_t) 2337 2338 def test_combinations_with_replacement_sizeof(self): 2339 cwr = combinations_with_replacement 2340 basesize = support.calcobjsize('3Pni') 2341 check = self.check_sizeof 2342 check(cwr('abcd', 3), basesize + 3 * self.ssize_t) 2343 check(cwr(range(10), 4), basesize + 4 * self.ssize_t) 2344 2345 def test_permutations_sizeof(self): 2346 basesize = support.calcobjsize('4Pni') 2347 check = self.check_sizeof 2348 check(permutations('abcd'), 2349 basesize + 4 * self.ssize_t + 4 * self.ssize_t) 2350 check(permutations('abcd', 3), 2351 basesize + 4 * self.ssize_t + 3 * self.ssize_t) 2352 check(permutations('abcde', 3), 2353 basesize + 5 * self.ssize_t + 3 * self.ssize_t) 2354 check(permutations(range(10), 4), 2355 basesize + 10 * self.ssize_t + 4 * self.ssize_t) 2356 2357 2358def load_tests(loader, tests, pattern): 2359 tests.addTest(doctest.DocTestSuite()) 2360 return tests 2361 2362 2363if __name__ == "__main__": 2364 unittest.main() 2365