import heapq from random import random, randrange @test def check_invariant(heap): # Check the heap invariant. for pos, item in enumerate(heap): if pos: # pos 0 has no parent parentpos = (pos-1) >> 1 assert heap[parentpos] <= item @test def test_heapify(): for size in list(range(30)) + [20000]: heap = [random() for dummy in range(size)] heapq.heapify(heap) check_invariant(heap) @test def test_naive_nbest(): data = [randrange(2000) for i in range(1000)] heap = List[int]() for item in data: heapq.heappush(heap, item) if len(heap) > 10: heapq.heappop(heap) heap.sort() assert heap == sorted(data)[-10:] def heapiter(heap): # An iterator returning a heap's elements, smallest-first. while heap: yield heapq.heappop(heap) @test def test_nbest(): # Less-naive "N-best" algorithm, much faster (if len(data) is big # enough ) than sorting all of data. However, if we had a max # heap instead of a min heap, it could go faster still via # heapify'ing all of data (linear time), then doing 10 heappops # (10 log-time steps). data = [randrange(2000) for i in range(1000)] heap = data[:10] heapq.heapify(heap) for item in data[10:]: if item > heap[0]: # this gets rarer the longer we run heapq.heapreplace(heap, item) assert list(heapiter(heap)) == sorted(data)[-10:] @test def test_nbest_with_pushpop(): data = [randrange(2000) for i in range(1000)] heap = data[:10] heapq.heapify(heap) for item in data[10:]: heapq.heappushpop(heap, item) assert list(heapiter(heap)) == sorted(data)[-10:] assert heapq.heappushpop(List[str](), 'x') == 'x' @test def test_heappushpop(): h = List[int]() x = heapq.heappushpop(h, 10) assert (h, x) == (List[int](), 10) h = [10] x = heapq.heappushpop(h, 9) assert (h, x) == ([10], 9) h = [10] x = heapq.heappushpop(h, 11) assert (h, x) == ([11], 10) @test def test_heappop_max(): # _heapop_max has an optimization for one-item lists which isn't # covered in other tests, so test that case explicitly here h = [3, 2] assert heapq._heappop_max(h) == 3 assert heapq._heappop_max(h) == 2 @test def test_heapsort(): # Exercise everything with repeated heapsort checks for trial in range(100): size = randrange(50) data = [randrange(25) for i in range(size)] heap = None if trial & 1: # Half of the time, use heapify heap = data[:] heapq.heapify(heap) else: # The rest of the time, use heappush heap = List[int]() for item in data: heapq.heappush(heap, item) heap_sorted = [heapq.heappop(heap) for i in range(size)] assert heap_sorted == sorted(data) ''' def test_merge(self): inputs = [] for i in range(randrange(25)): row = [] for j in range(randrange(100)): tup = choice('ABC'), randrange(-500, 500) row.append(tup) inputs.append(row) for key in [None, itemgetter(0), itemgetter(1), itemgetter(1, 0)]: for reverse in [False, True]: seqs = [] for seq in inputs: seqs.append(sorted(seq, key=key, reverse=reverse)) self.assertEqual(sorted(chain(*inputs), key=key, reverse=reverse), list(heapq.merge(*seqs, key=key, reverse=reverse))) self.assertEqual(list(heapq.merge()), []) def test_empty_merges(self): # Merging two empty lists (with or without a key) should produce # another empty list. self.assertEqual(list(heapq.merge([], [])), []) self.assertEqual(list(heapq.merge([], [], key=lambda: 6)), []) def test_merge_does_not_suppress_index_error(self): # Issue 19018: Heapq.merge suppresses IndexError from user generator def iterable(): s = list(range(10)) for i in range(20): yield s[i] # IndexError when i > 10 with self.assertRaises(IndexError): list(heapq.merge(iterable(), iterable())) def test_merge_stability(self): class Int(int): pass inputs = [[], [], [], []] for i in range(20000): stream = randrange(4) x = randrange(500) obj = Int(x) obj.pair = (x, stream) inputs[stream].append(obj) for stream in inputs: stream.sort() result = [i.pair for i in heapq.merge(*inputs)] self.assertEqual(result, sorted(result)) ''' def mykey(x: Tuple[int,int]): return (x[0] * 547 % 2000, x[1] * 547 % 2000) @test def test_nsmallest(): data = [(randrange(2000), i) for i in range(1000)] for n in (10,): assert list(heapq.nsmallest(n, data)) == sorted(data)[:min(n, len(data))] assert list(heapq.nsmallest(n, data, key=mykey)) == sorted(data, key=mykey)[:min(n, len(data))] assert list(heapq.nsmallest(n, data)) == sorted(data)[:min(n, len(data))] @test def test_nlargest(): data = [(randrange(2000), i) for i in range(1000)] for n in (0, 1, 2, 10, 100, 400, 999, 1000, 1100): assert list(heapq.nlargest(n, data)) == sorted(data)[::-1][:min(n, len(data))] assert list(heapq.nlargest(n, data, key=mykey)) == sorted(data, key=mykey)[::-1][:min(n, len(data))] assert list(heapq.nlargest(n, data)) == sorted(data)[::-1][:min(n, len(data))] @test def test_comparison_operator(): def hsort[T](data: List[float]): data2 = [T(x) for x in data] heapq.heapify(data2) return [heapq.heappop(data2).x for i in range(len(data))] class LT: x: float def __init__(self, x: float): self.x = x def __lt__(self, other: LT): return self.x > other.x data = [random() for i in range(100)] target = list(reversed(sorted(data))) assert hsort(data, LT) == target test_heapify() test_naive_nbest() test_nsmallest() test_nlargest() test_nbest() test_nbest_with_pushpop() test_heappushpop() test_heappop_max() test_heapsort() # test_comparison_operator()