codon/stdlib/itertools.codon

424 lines
12 KiB
Python

def combinations(pool, r: int):
"""
Return successive r-length combinations of elements in the iterable.
combinations(range(4), 3) --> (0,1,2), (0,1,3), (0,2,3), (1,2,3)
"""
def combinations_helper(pool_list, r):
n = len(pool_list)
if r > n:
return
indices = list(range(r))
yield [pool_list[i] for i in indices]
while True:
b = -1
for i in reversed(range(r)):
if indices[i] != i + n - r:
b = i
break
if b == -1:
return
indices[b] += 1
for j in range(b+1, r):
indices[j] = indices[j-1] + 1
yield [pool_list[i] for i in indices]
if r < 0:
raise ValueError("r must be non-negative")
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
return combinations_helper(pool, r)
else:
return combinations_helper([a for a in pool], r)
def combinations_with_replacement(pool, r: int):
"""
Return successive r-length combinations of elements in the iterable
allowing individual elements to have successive repeats.
"""
def combinations_with_replacement_helper(pool_list, r):
n = len(pool_list)
if not n and r:
return
indices = [0 for _ in range(r)]
yield [pool_list[i] for i in indices]
while True:
b = -1
for i in reversed(range(r)):
if indices[i] != n - 1:
b = i
break
if b == -1:
return
newval = indices[b] + 1
for j in range(r - b):
indices[b+j] = newval
yield [pool_list[i] for i in indices]
if r < 0:
raise ValueError("r must be non-negative")
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
return combinations_with_replacement_helper(pool, r)
else:
return combinations_with_replacement_helper([a for a in pool], r)
@tuple
class islice:
"""
Make an iterator that returns selected elements from the iterable.
"""
def __new__[T](iterable: Generator[T], stop: Optional[int]):
if stop and ~stop < 0:
raise ValueError("Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.")
i = 0
for x in iterable:
if stop and i >= ~stop:
break
yield x
i += 1
def __new__[T](iterable: Generator[T], start: Optional[int], stop: Optional[int], step: Optional[int] = None):
from sys import maxsize
startx = 0
stopx = maxsize
stepx = 1
have_stop = False
if start:
startx = ~start
if stop:
stopx = ~stop
if step:
stepx = ~step
if startx < 0 or stopx < 0:
raise ValueError("Indices for islice() must be None or an integer: 0 <= x <= sys.maxsize.")
elif stepx < 0:
raise ValueError("Step for islice() must be a positive integer or None.")
it = range(startx, stopx, stepx)
N = len(it)
idx = 0
b = -1
if N == 0:
for i, element in zip(range(start), iterable):
pass
return
nexti = it[0]
for i, element in enumerate(iterable):
if i == nexti:
yield element
idx += 1
if idx >= N:
b = i
break
nexti = it[idx]
if b >= 0:
for i, element in zip(range(b + 1, stopx), iterable):
pass
@inline
def count(start = 0, step = 1):
"""
Return a count object whose ``__next__`` method returns consecutive values.
"""
n = start
while True:
yield n
n += step
@inline
def repeat(object, times: Optional[int] = None):
"""
Make an iterator that returns a given object over and over again.
"""
if times is None:
while True:
yield object
else:
for i in range(~times):
yield object
@inline
def cycle(iterable):
"""
Cycles repeatedly through an iterable.
"""
saved = []
for element in iterable:
yield element
saved.append(element)
while saved:
for element in saved:
yield element
@inline
def compress(data, selectors):
"""
Return data elements corresponding to true selector elements.
Forms a shorter iterator from selected data elements using the selectors to
choose the data elements.
"""
for d,s in zip(data, selectors):
if s:
yield d
@inline
def dropwhile(predicate, iterable):
"""
Drop items from the iterable while predicate(item) is true.
Afterwards, return every element until the iterable is exhausted.
"""
b = False
for x in iterable:
if not b and not predicate(x):
b = True
if b:
yield x
@inline
def takewhile(predicate, iterable):
"""
Return successive entries from an iterable as long as the predicate evaluates to true for each entry.
"""
for x in iterable:
if predicate(x):
yield x
else:
break
@inline
def filterfalse(predicate, iterable):
"""
Return those items of iterable for which function(item) is false.
"""
for x in iterable:
if not predicate(x):
yield x
def permutations(pool, r: Optional[int] = None):
"""
Return successive r-length permutations of elements in the iterable.
"""
def permutations_helper(pool_list, r):
n = len(pool_list)
rx = ~r if r else n
if rx > n:
return
indices = list(range(n))
cycles = list(range(n, n - rx, -1))
yield [pool_list[i] for i in indices[:rx]]
while n:
b = -1
for i in reversed(range(rx)):
cycles[i] -= 1
if cycles[i] == 0:
indices = indices[:i] + indices[i+1:] + indices[i:i+1]
cycles[i] = n - i
else:
b = i
j = cycles[i]
indices[i], indices[-j] = indices[-j], indices[i]
yield [pool_list[i] for i in indices[:rx]]
break
if b == -1:
return
if r is not None and ~r < 0:
raise ValueError("r must be non-negative")
if hasattr(pool, "__getitem__") and hasattr(pool, "__len__"):
return permutations_helper(pool, r)
else:
return permutations_helper([a for a in pool], r)
@tuple
class accumulate:
"""
Make an iterator that returns accumulated sums, or accumulated results
of other binary functions (specified via the optional func argument).
"""
@inline
def __new__(iterable, func = lambda a, b: a + b, initial = 0):
total = initial
yield total
for element in iterable:
total = func(total, element)
yield total
@inline
def __new__(iterable, func = lambda a, b: a + b):
total = None
for element in iterable:
total = element if total is None else func(~total, element)
yield ~total
@tuple
class chain:
"""
Make an iterator that returns elements from the first iterable until it is exhausted,
then proceeds to the next iterable, until all of the iterables are exhausted.
"""
@inline
def __new__(*iterables):
for it in iterables:
for element in it:
yield element
@inline
def from_iterable(iterables):
for it in iterables:
for element in it:
yield element
@inline
def starmap(function, iterable):
"""
Return an iterator whose values are returned from the function
evaluated with an argument tuple taken from the given sequence.
"""
for args in iterable:
yield function(*args)
# TODO: fix this once Optional[Callable] lands
@inline
def groupby(iterable, key = Optional[int]()):
"""
Make an iterator that returns consecutive keys and groups from the iterable.
"""
currkey = None
group = []
for currvalue in iterable:
k = currvalue if isinstance(key, Optional) else key(currvalue)
if currkey is None:
currkey = k
if k != ~currkey:
yield ~currkey, group
currkey = k
group = []
group.append(currvalue)
if currkey is not None:
yield ~currkey, group
@tuple
class zip_longest:
"""
Make an iterator that aggregates elements from each of the iterables.
If the iterables are of uneven length, missing values are filled-in
with fillvalue. Iteration continues until the longest iterable is
exhausted.
"""
@inline
def __new__(*args, fillvalue):
if staticlen(args) == 2:
a = iter(args[0])
b = iter(args[1])
a_done = False
b_done = False
while not a.done():
a_val = a.next()
b_val = fillvalue
if not b_done:
b_done = b.done()
if not b_done: b_val = b.next()
yield a_val, b_val
if not b_done:
while not b.done():
yield fillvalue, b.next()
a.destroy()
b.destroy()
else:
iterators = tuple(iter(it) for it in args)
num_active = len(iterators)
if not num_active:
return
while True:
values = []
for it in iterators:
if it.__done__(): # already done
values.append(fillvalue)
elif it.done(): # resume and check
num_active -= 1
if not num_active:
return
values.append(fillvalue)
else:
values.append(it.next())
yield values
@inline
def __new__(*args):
def get_next(it):
if it.__done__() or it.done():
return None
return it.next()
iters = tuple(iter(arg) for arg in args)
while True:
done_count = 0
result = tuple(get_next(it) for it in iters)
all_none = True
for a in result:
if a is not None:
all_none = False
if all_none:
return
yield result
for it in iters:
it.destroy()
@tuple
class product:
"""
Cartesian product of input iterables.
"""
@inline
def __new__(*args):
if staticlen(args) == 0:
yield ()
else:
for a in args[0]:
rest = args[1:]
for b in product(*rest):
yield (a, *b)
@inline
def __new__(*args, repeat: int):
if repeat < 0:
raise ValueError("repeat argument cannot be negative")
pools = [list(pool) for _ in range(repeat) for pool in args]
result = [List[type(pools[0][0])]()]
for pool in pools:
result = [x+[y] for x in result for y in pool]
for prod in result:
yield prod
def tee[T](iterable: Generator[T], n: int = 2):
"""
Return n independent iterators from a single iterable.
"""
from collections import deque
it = iter(iterable)
deques = [deque[T]() for i in range(n)]
def gen(mydeque):
while True:
if not mydeque: # when the local deque is empty
if it.__done__():
return
it.__resume__()
if it.__done__():
return
newval = it.next()
for d in deques: # load it to all the deques
d.append(newval)
yield mydeque.popleft()
return [gen(d) for d in deques]