codon/stdlib/heapq.codon

290 lines
9.4 KiB
Python

# TODO: heapq.merge
# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
def _siftdown[T](heap: List[T], startpos: int, pos: int):
newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
while pos > startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if newitem < parent:
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem
def _siftup[T](heap: List[T], pos: int):
endpos = len(heap)
startpos = pos
newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
rightpos = childpos + 1
if rightpos < endpos and not heap[childpos] < heap[rightpos]:
childpos = rightpos
# Move the smaller child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)
def _siftdown_max[T](heap: List[T], startpos: int, pos: int):
'Maxheap variant of _siftdown'
newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
while pos > startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if parent < newitem:
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem
def _siftup_max[T](heap: List[T], pos: int):
'Maxheap variant of _siftup'
endpos = len(heap)
startpos = pos
newitem = heap[pos]
# Bubble up the larger child until hitting a leaf.
childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of larger child.
rightpos = childpos + 1
if rightpos < endpos and not heap[rightpos] < heap[childpos]:
childpos = rightpos
# Move the larger child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown_max(heap, startpos, pos)
def heappush[T](heap: List[T], item: T):
"""Push item onto heap, maintaining the heap invariant."""
heap.append(item)
_siftdown(heap, 0, len(heap)-1)
def heappop[T](heap: List[T]):
"""Pop the smallest item off the heap, maintaining the heap invariant."""
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup(heap, 0)
return returnitem
return lastelt
def heapreplace[T](heap: List[T], item: T):
"""
Pop and return the current smallest value, and add the new item.
This is more efficient than heappop() followed by heappush(), and can be
more appropriate when using a fixed-size heap. Note that the value
returned may be larger than item! That constrains reasonable uses of
this routine unless written as part of a conditional replacement:
``if item > heap[0]: item = heapreplace(heap, item)``.
"""
returnitem = heap[0] # raises appropriate IndexError if heap is empty
heap[0] = item
_siftup(heap, 0)
return returnitem
def heappushpop[T](heap: List[T], item: T):
"""Fast version of a heappush followed by a heappop."""
if heap and heap[0] < item:
item, heap[0] = heap[0], item
_siftup(heap, 0)
return item
def heapify[T](x: List[T]):
"""Transform list into a heap, in-place, in $O(len(x))$ time."""
n = len(x)
# Transform bottom-up. The largest index there's any point to looking at
# is the largest with a child index in-range, so must have 2*i + 1 < n,
# or i < (n-1)/2. If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1. If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1.
for i in reversed(range(n//2)):
_siftup(x, i)
def _heappop_max[T](heap: List[T]):
"""Maxheap version of a heappop."""
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup_max(heap, 0)
return returnitem
return lastelt
def _heapreplace_max[T](heap: List[T], item: T):
"""Maxheap version of a heappop followed by a heappush."""
returnitem = heap[0] # raises appropriate IndexError if heap is empty
heap[0] = item
_siftup_max(heap, 0)
return returnitem
def _heapify_max[T](x: List[T]):
"""Transform list into a maxheap, in-place, in O(len(x)) time."""
n = len(x)
for i in reversed(range(n//2)):
_siftup_max(x, i)
def nsmallest[T](n: int, iterable: Generator[T], key = Optional[int]()):
"""Find the n smallest elements in a dataset.
Equivalent to: sorted(iterable, key=key)[:n]
"""
if n == 1:
v = List[T](1)
for a in iterable:
if not v:
v.append(a)
else:
if not isinstance(key, Optional):
if key(a) < key(v[0]):
v[0] = a
elif a < v[0]:
v[0] = a
return v
# When key is none, use simpler decoration
if isinstance(key, Optional):
it = iter(iterable)
# put the range(n) first so that zip() doesn't
# consume one too many elements from the iterator
result = List[Tuple[T,int]](n)
done = False
for i in range(n):
if it.done():
done = True
break
result.append((it.next(), i))
if not result:
it.destroy()
return List[T](0)
_heapify_max(result)
top = result[0][0]
order = n
if not done:
for elem in it:
if elem < top:
_heapreplace_max(result, (elem, order))
top, _order = result[0]
order += 1
else:
it.destroy()
result.sort()
return [elem for elem, order in result]
else:
# General case, slowest method
it = iter(iterable)
result = List[Tuple[type(key(T())),int,T]](n)
done = False
for i in range(n):
if it.done():
done = True
break
elem = it.next()
result.append((key(elem), i, elem))
if not result:
it.destroy()
return List[T](0)
_heapify_max(result)
top = result[0][0]
order = n
if not done:
for elem in it:
k = key(elem)
if k < top:
_heapreplace_max(result, (k, order, elem))
top, _order, _elem = result[0]
order += 1
else:
it.destroy()
result.sort()
return [elem for k, order, elem in result]
def nlargest[T](n: int, iterable: Generator[T], key = Optional[int]()):
"""Find the n largest elements in a dataset.
Equivalent to: sorted(iterable, key=key, reverse=True)[:n]
"""
if n == 1:
v = List[T](1)
for a in iterable:
if not v:
v.append(a)
else:
if not isinstance(key, Optional):
if key(a) > key(v[0]):
v[0] = a
elif a > v[0]:
v[0] = a
return v
# When key is none, use simpler decoration
if isinstance(key, Optional):
it = iter(iterable)
result = List[Tuple[T,int]](n)
done = False
for i in range(0, -n, -1):
if it.done():
done = True
break
result.append((it.next(), i))
if not result:
it.destroy()
return List[T](0)
heapify(result)
top = result[0][0]
order = -n
if not done:
for elem in it:
if top < elem:
heapreplace(result, (elem, order))
top, _order = result[0]
order -= 1
else:
it.destroy()
result.sort()
return [elem for elem, order in reversed(result)]
else:
# General case, slowest method
it = iter(iterable)
result = List[Tuple[type(key(T())),int,T]](n)
done = False
for i in range(0, -n, -1):
if it.done():
done = True
break
elem = it.next()
result.append((key(elem), i, elem))
if not result:
return List[T](0)
heapify(result)
top = result[0][0]
order = -n
if not done:
for elem in it:
k = key(elem)
if top < k:
heapreplace(result, (k, order, elem))
top, _order, _elem = result[0]
order -= 1
else:
it.destroy()
result.sort()
return [elem for k, order, elem in reversed(result)]