import heapq from random import random, randrange @test def check_invariant(heap): # Check the heap invariant. for pos, item in enumerate(heap): if pos: # pos 0 has no parent parentpos = (pos - 1) >> 1 assert heap[parentpos] <= item @test def test_heapify(): for size in list(range(30)) + [20000]: heap = [random() for dummy in range(size)] heapq.heapify(heap) check_invariant(heap) @test def test_naive_nbest(): data = [randrange(2000) for i in range(1000)] heap = [] for item in data: heapq.heappush(heap, item) if len(heap) > 10: heapq.heappop(heap) heap.sort() assert heap == sorted(data)[-10:] def heapiter(heap): # An iterator returning a heap's elements, smallest-first. while heap: yield heapq.heappop(heap) @test def test_nbest(): # Less-naive "N-best" algorithm, much faster (if len(data) is big # enough ) than sorting all of data. However, if we had a max # heap instead of a min heap, it could go faster still via # heapify'ing all of data (linear time), then doing 10 heappops # (10 log-time steps). data = [randrange(2000) for i in range(1000)] heap = data[:10] heapq.heapify(heap) for item in data[10:]: if item > heap[0]: # this gets rarer the longer we run heapq.heapreplace(heap, item) assert list(heapiter(heap)) == sorted(data)[-10:] @test def test_nbest_with_pushpop(): data = [randrange(2000) for i in range(1000)] heap = data[:10] heapq.heapify(heap) for item in data[10:]: heapq.heappushpop(heap, item) assert list(heapiter(heap)) == sorted(data)[-10:] assert heapq.heappushpop(List[str](), "x") == "x" @test def test_heappushpop(): h = List[int]() x = heapq.heappushpop(h, 10) assert (h, x) == (List[int](), 10) h = [10] x = heapq.heappushpop(h, 9) assert (h, x) == ([10], 9) h = [10] x = heapq.heappushpop(h, 11) assert (h, x) == ([11], 10) @test def test_heappop_max(): # _heapop_max has an optimization for one-item lists which isn't # covered in other tests, so test that case explicitly here h = [3, 2] assert heapq._heappop_max(h) == 3 assert heapq._heappop_max(h) == 2 @test def test_heapsort(): # Exercise everything with repeated heapsort checks for trial in range(100): size = randrange(50) data = [randrange(25) for i in range(size)] heap = None if trial & 1: # Half of the time, use heapify heap = data[:] heapq.heapify(heap) else: # The rest of the time, use heappush heap = [] for item in data: heapq.heappush(heap, item) heap_sorted = [heapq.heappop(heap) for i in range(size)] assert heap_sorted == sorted(data) """ def test_merge(self): inputs = [] for i in range(randrange(25)): row = [] for j in range(randrange(100)): tup = choice('ABC'), randrange(-500, 500) row.append(tup) inputs.append(row) for key in [None, itemgetter(0), itemgetter(1), itemgetter(1, 0)]: for reverse in [False, True]: seqs = [] for seq in inputs: seqs.append(sorted(seq, key=key, reverse=reverse)) self.assertEqual(sorted(chain(*inputs), key=key, reverse=reverse), list(heapq.merge(*seqs, key=key, reverse=reverse))) self.assertEqual(list(heapq.merge()), []) def test_empty_merges(self): # Merging two empty lists (with or without a key) should produce # another empty list. self.assertEqual(list(heapq.merge([], [])), []) self.assertEqual(list(heapq.merge([], [], key=lambda: 6)), []) def test_merge_does_not_suppress_index_error(self): # Issue 19018: Heapq.merge suppresses IndexError from user generator def iterable(): s = list(range(10)) for i in range(20): yield s[i] # IndexError when i > 10 with self.assertRaises(IndexError): list(heapq.merge(iterable(), iterable())) def test_merge_stability(self): class Int(int): pass inputs = [[], [], [], []] for i in range(20000): stream = randrange(4) x = randrange(500) obj = Int(x) obj.pair = (x, stream) inputs[stream].append(obj) for stream in inputs: stream.sort() result = [i.pair for i in heapq.merge(*inputs)] self.assertEqual(result, sorted(result)) """ def mykey(x: Tuple[int, int]): return (x[0] * 547 % 2000, x[1] * 547 % 2000) @test def test_nsmallest(): data = [(randrange(2000), i) for i in range(1000)] for n in (10,): assert list(heapq.nsmallest(n, data)) == sorted(data)[: min(n, len(data))] assert ( list(heapq.nsmallest(n, data, key=mykey)) == sorted(data, key=mykey)[: min(n, len(data))] ) assert list(heapq.nsmallest(n, data)) == sorted(data)[: min(n, len(data))] @test def test_nlargest(): data = [(randrange(2000), i) for i in range(1000)] for n in (0, 1, 2, 10, 100, 400, 999, 1000, 1100): assert list(heapq.nlargest(n, data)) == sorted(data)[::-1][: min(n, len(data))] assert ( list(heapq.nlargest(n, data, key=mykey)) == sorted(data, key=mykey)[::-1][: min(n, len(data))] ) assert list(heapq.nlargest(n, data)) == sorted(data)[::-1][: min(n, len(data))] @test def test_comparison_operator(): def hsort(data: List[float], T: type): data2 = [T(x) for x in data] heapq.heapify(data2) return [heapq.heappop(data2).x for i in range(len(data))] class LT: x: float def __init__(self, x: float): self.x = x def __lt__(self, other: LT): return self.x > other.x data = [random() for i in range(100)] target = list(reversed(sorted(data))) assert hsort(data, LT) == target @test def test_merge(): assert [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25] == list(heapq.merge([1, 3, 5, 7], [0, 2, 4, 8], [5, 10, 15, 20], List[int](), [25])) assert ['dog', 'cat', 'fish', 'horse', 'kangaroo'] == list(heapq.merge(['dog', 'horse'], ['cat', 'fish', 'kangaroo'], key=len)) assert [25, 20, 15, 10, 8, 7, 5, 5, 4, 3, 2, 1, 0] == list(heapq.merge([7, 5, 3, 1], [8, 4, 2, 0], [20, 15, 10, 5], List[int](), [25], reverse=True)) assert ['kangaroo', 'horse', 'fish', 'dog', 'cat'] == list(heapq.merge(['horse', 'dog'], ['kangaroo', 'fish', 'cat'], key=len, reverse=True)) test_heapify() test_naive_nbest() test_nsmallest() test_nlargest() test_nbest() test_nbest_with_pushpop() test_heappushpop() test_heappop_max() test_heapsort() # test_comparison_operator() test_merge()