'Pythonic way to chain python generator function to form a pipeline

I'm doing a pipeline code refactoring using python.

Assuming we have a series of generator functions and we want to chain those to form a data processing pipeline.

Example:

#!/usr/bin/python
import itertools

def foo1(g):
    for i in g:
        yield i + 1

def foo2(g):
    for i in g:
        yield 10 + i

def foo3(g):
    for i in g:
        yield 'foo3:' + str(i)

res = foo3(foo2(foo1(range(0, 5))))

for i in res:
    print i

Output:

foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

I wish I could rewrite it like chain in jquery. Something similar to :

range(0, 5).foo1().foo2().foo3()

Or maybe

l = [range(0, 5), foo1, foo2, foo3]
res = runner.run(l)

But I'm new to generator topic and couldn't find a way to achieve this.

Any help will be welcome.



Solution 1:[1]

I do not think foo3(foo2(foo1(range(0, 5)))) is a pythonic way to achieve my pipeline goal. Especially when the number of stages in the pipeline is large.

There is a fairly trivial, and in my opinion clear, way of chaining generators: assigning the result of each to a variable, where each can have a descriptive name.

range_iter = range(0, 5)
foo1_iter = foo1(range_iter)
foo2_iter = foo2(foo1_iter)
foo3_iter = foo3(foo2_iter)

for i in foo3_iter:
  print(i)

I prefer this to a something that uses a higher order function, e.g. a reduce or similar:

  • In my real cases, often each foo* generator function needs its own other parameters, which is tricky if using a reduce.

  • In my real cases, the steps in the pipeline are not dynamic at runtime: it seems a bit odd/unexpected (to me) to have a pattern that seems more appropriate for a dynamic case.

  • It's a bit inconsistent with how regular functions are typically written where each is called explicitly, and the result of each is passed to the call of the next. Yes, I guess a bit of duplication, but I'm happy with "calling a function" being duplicated since (to me) it's really clear.

  • No need for an import: it uses core language features.

Solution 2:[2]

Following up on your runner.run approach, let's define this utility function:

def recur(ops):
    return ops[0](recur(ops[1:])) if len(ops)>1 else ops[0]

As an example:

>>> ops = foo3, foo2, foo1, range(0, 5)
>>> list( recur(ops) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

Alternative: backward ordering

def backw(ops):
    return ops[-1](backw(ops[:-1])) if len(ops)>1 else ops[0]

For example:

>>> list( backw([range(0, 5), foo1, foo2, foo3]) )
['foo3:11', 'foo3:12', 'foo3:13', 'foo3:14', 'foo3:15']

Solution 3:[3]

Here is another answer in case the function in your example are one-time(or one-use) function. Some nice variable naming and use of generator expression can be helpful for small operations.

>>> g = range(0, 5)
>>> foo1 = (x+1 for x in g)
>>> foo2 = (x+10 for x in foo1)
>>> foo3 = ('foo3:' + str(x) for x in foo2)
>>> for x in foo3:
...     print x
...
foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

Solution 4:[4]

You can compose curried generator functions using PyMonad:

def main():
    odds = list * \
         non_divisibles(2) * \
         lengths * \
         Just(["1", "22", "333", "4444", "55555"])
    print(odds.getValue())    #prints [1, 3, 5]


@curry
def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Another alternative is to start with a Monad and compose the generators using fmap calls - this syntax is familiar to Java 8 Stream users:

def main():
    odds = Just(["1", "22", "333", "4444", "55555"]) \
        .fmap(lengths) \
        .fmap(non_divisibles(2)) \
        .fmap(list) \
        .getValue()
    print(odds)   #prints [1, 3, 5]


def lengths(words: Iterable[Sized]) -> Iterable[int]:
    return map(len, words)


@curry
def non_divisibles(div: int, numbers: Iterable[int]) -> Iterable[int]:
    return (n for n in numbers if n % div)

Note that the functions don't need to be decorated with @curry in this case. The entire chain of transformations is not evaluated until the terminal getValue() call.

Solution 5:[5]

For future readers: another solution that is very pythonic (IMHO):

steps = [
    foo1, 
    foo2, 
    foo3
    ]

res = range(0, 5)
for step in steps:
    res = step(res)

for i in res:
    print(i)

foo3:11
foo3:12
foo3:13
foo3:14
foo3:15

This is essentially doing the same as functools.reduce like in maxymoo's answer. The laziness of generators allows this simple formulation without functools.

Solution 6:[6]

Although this implementation requires some overhead, I prefer to use >> operator for chaining pipeline steps; similar to how tasks are arranged into a dag in Airflow.

def foo1(g):
    for i in g:
        yield i + 1


def foo2(g):
    for i in g:
        yield 10 + i


def foo3(g):
    for i in g:
        yield "foo3:" + str(i)


def print_loop(g):
    for i in g:
        print(i)


class PipelineOperator:
    def __init__(self, task):
        self.task = task

    def __rrshift__(self, x):
        return self.task(x)


foo1_t = PipelineOperator(foo1)
foo2_t = PipelineOperator(foo2)
foo3_t = PipelineOperator(foo3)
print_loop_t = PipelineOperator(print_loop)

(range(0, 5) >> foo1_t >> foo2_t >> foo3_t >> print_loop_t)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Michal Charemza
Solution 2
Solution 3 Jay Rajput
Solution 4
Solution 5 examiner
Solution 6 Matthew Thomas