'Pythonic way to chain python generator function to form a pipeline
I'm doing a pipeline code refactoring using python.
Assuming we have a series of generator functions and we want to chain those to form a data processing pipeline.
Example:
#!/usr/bin/python
import itertools
def foo1(g):
for i in g:
yield i + 1
def foo2(g):
for i in g:
yield 10 + i
def foo3(g):
for i in g:
yield 'foo3:' + str(i)
res = foo3(foo2(foo1(range(0, 5))))
for i in res:
print i
Output:
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15
I do not think foo3(foo2(foo1(range(0, 5))))
is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.
I wish I could rewrite it like chain in jquery. Something similar to :
range(0, 5).foo1().foo2().foo3()
Or maybe
l = [range(0, 5), foo1, foo2, foo3]
res = runner.run(l)
But I'm new to generator topic and couldn't find a way to achieve this.
Any help will be welcome.
Solution 1:[1]
I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.
There is a fairly trivial, and in my opinion clear, way of chaining generators: assigning the result of each to a variable, where each can have a descriptive name.
range_iter = range(0, 5)
foo1_iter = foo1(range_iter)
foo2_iter = foo2(foo1_iter)
foo3_iter = foo3(foo2_iter)
for i in foo3_iter:
print(i)
I prefer this to a something that uses a higher order function, e.g. a reduce
or similar:
In my real cases, often each foo* generator function needs its own other parameters, which is tricky if using a
reduce
.In my real cases, the steps in the pipeline are not dynamic at runtime: it seems a bit odd/unexpected (to me) to have a pattern that seems more appropriate for a dynamic case.
It's a bit inconsistent with how regular functions are typically written where each is called explicitly, and the result of each is passed to the call of the next. Yes, I guess a bit of duplication, but I'm happy with "calling a function" being duplicated since (to me) it's really clear.
No need for an import: it uses core language features.
Solution 2:[2]
Following up on your runner.run approach, let's define this utility function:
def recur(ops):
return ops[0](recur(ops[1:])) if len(ops)>1 else ops[0]
As an example:
>>> ops = foo3, foo2, foo1, range(0, 5)
>>> list( recur(ops) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']
Alternative: backward ordering
def backw(ops):
return ops[-1](backw(ops[:-1])) if len(ops)>1 else ops[0]
For example:
>>> list( backw([range(0, 5), foo1, foo2, foo3]) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']
Solution 3:[3]
Here is another answer in case the function in your example are one-time(or one-use) function. Some nice variable naming and use of generator expression can be helpful for small operations.
>>> g = range(0, 5)
>>> foo1 = (x+1 for x in g)
>>> foo2 = (x+10 for x in foo1)
>>> foo3 = ('foo3:' + str(x) for x in foo2)
>>> for x in foo3:
... print x
...
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15
Solution 4:[4]
You can compose curried generator functions using PyMonad:
def main():
odds = list * \
non_divisibles(2) * \
lengths * \
Just(["1", "22", "333", "4444", "55555"])
print(odds.getValue()) #prints [1, 3, 5]
@curry
def lengths(words: Iterable[Sized]) -> Iterable[int]:
return map(len, words)
@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
return (n for n in numbers if n % div)
Another alternative is to start with a Monad and compose the generators using fmap calls - this syntax is familiar to Java 8 Stream users:
def main():
odds = Just(["1", "22", "333", "4444", "55555"]) \
.fmap(lengths) \
.fmap(non_divisibles(2)) \
.fmap(list) \
.getValue()
print(odds) #prints [1, 3, 5]
def lengths(words: Iterable[Sized]) -> Iterable[int]:
return map(len, words)
@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
return (n for n in numbers if n % div)
Note that the functions don't need to be decorated with @curry in this case. The entire chain of transformations is not evaluated until the terminal getValue() call.
Solution 5:[5]
For future readers: another solution that is very pythonic (IMHO):
steps = [
foo1,
foo2,
foo3
]
res = range(0, 5)
for step in steps:
res = step(res)
for i in res:
print(i)
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15
This is essentially doing the same as functools.reduce like in maxymoo's answer. The laziness of generators allows this simple formulation without functools.
Solution 6:[6]
Although this implementation requires some overhead, I prefer to use >>
operator for chaining pipeline steps; similar to how tasks are arranged into a dag in Airflow.
def foo1(g):
for i in g:
yield i + 1
def foo2(g):
for i in g:
yield 10 + i
def foo3(g):
for i in g:
yield "foo3:" + str(i)
def print_loop(g):
for i in g:
print(i)
class PipelineOperator:
def __init__(self, task):
self.task = task
def __rrshift__(self, x):
return self.task(x)
foo1_t = PipelineOperator(foo1)
foo2_t = PipelineOperator(foo2)
foo3_t = PipelineOperator(foo3)
print_loop_t = PipelineOperator(print_loop)
(range(0, 5) >> foo1_t >> foo2_t >> foo3_t >> print_loop_t)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Michal Charemza |
Solution 2 | |
Solution 3 | Jay Rajput |
Solution 4 | |
Solution 5 | examiner |
Solution 6 | Matthew Thomas |