Sam's world

A stream of thought, whim, and wisdom

Python function pipelines


If your Python code represents a function pipeline, it should look like a function pipeline. This post presents a simple, strongly-typed function pipeline for your personal projects to make beautiful, explicit, Unix-like pipelines in Python. Requires Python 3.6 or greater.

I was reading through the pytorch reinforcement learning documentation today and came across the following irksome pattern:

...

def forward(self, x):
    x = F.relu(self.bn1(self.conv1(x)))
    x = F.relu(self.bn2(self.conv2(x)))
    x = F.relu(self.bn3(self.conv3(x)))
    return self.head(x.view(x.size(0), -1))
..

The manipulation and reassignment of a variable x to itself as it is passed through a neural network’s gateways makes neural networks that much harder to follow. IMHO, explicit variable reassignment is almost always bad because it makes it hard for my limited human brain to track a variable name’s associated data during program execution. If a program tells me that x is an integer on line 10 and then reassigns it to a string on line 20, I will want to delete said program from my computer, take a cold shower, and exact revenge on the program’s author. With that established, let’s just avoid reassignment altogether.

The problem

In the above example, variable reassignment is convenient. It prevents the programmer from needing to come up with new names for each state in the connected pipeline. How can we find an elegant way of avoiding explicit variable reassignment in the pipeline use-case while still producing readable, performant code? The answer: take inspiration from Unix pipes!

Unix pipes

In the Unix shell, virtually everything we work with has the same type: a file. “Everything is a File” makes it easy to chain functions together because all commands take files as inputs and return files as outputs. One common way to chain system calls in Unix is called the anonymous pipe, which enables programmers to chain command line programs together to manipulate a text stream. See the following example:

#!/bin/bash

# Count the number of words
echo "hello hello hello world world" | wc -w

# Count the number of words that are not "hello"
echo "hello hello hello world world" | sed 's/hello//g' | wc -w

# Count the number of words that are not "world"
echo "hello hello hello world world" | sed 's/world//g' | wc -w

# Should print the following to console:
# 5
# 2
# 3

Notice that data flows through the pipeline and no variable reassignment is used. The Unix pipeline is beautiful in this regard; I’d like to build something similar in Python.

Example

Let’s say we have the following three functions:

def add_5(value: int) -> int:
    return value + 5

def add_6(value: int) -> int:
    return value + 6

def add_7(value: int) -> int:
    return value + 7

Our example problem: take an initial integer 0 and add 5, then 6, then 7 to it using our three functions so that our final result is 18.

Solution 1: name each step in the pipeline

We can do slightly better than the pytorch example and create a unique variable name for each step in the process.

x_0 = 0
x_5 = add_5(x_0)
x_6 = add_6(x_5)
x = add_7(x_6)

print(f'x={x}')

Besides being pretty ugly / hard to read, this creates some useless names in our module’s scope. Do we really need to give each step its own name? We don’t use the steps anywhere else and this makes the pipeline pretty hard to edit. This solution almost makes we wish we could go back to reassignment. Fortunately, the entire pipeline can be expressed with one name thanks to the “reduce” function.

Solution 2: use the “reduce” function

from functools import reduce
y = reduce(
    lambda value, function: function(value),
    (
        add_5,
        add_6,
        add_7,
    ),
    0,
)

print(f'y={y}')

This example uses the standard library “reduce” function. Originally, “reduce” was intended to take a list of values and collapse them into one value. This use-case is described well in the Python documentation and in this YouTube video. Here, we use it a bit differently. Instead of taking a list of values and applying a “collapsing” function to them, we take a list of functions and pass a value through them. This has obvious advantages over solution 1: we can easily swap new functions in and out of the pipeline to suit our needs without needing to adjust any other code / rename pipeline steps (because we haven’t named the steps at all!).

This solution frees us from naming each step in our pipeline but it does have some disadvantages: “reduce” is kind of hard to read and this use-case isn’t quite standard. Whenever we find ourselves using a standard library function in a confusing way, that’s a signal that we should probably define our own function to make this clearer to ourselves and to those who read our code.

Solution 3: custom “pipeline” function

from typing import TypeVar, Callable, Sequence

T = TypeVar('T')

def pipeline(
        value: T,
        function_pipeline: Sequence[Callable[[T], T]],
) -> T:
    '''A generic Unix-like pipeline

    :param value: the value you want to pass through a pipeline
    :param function_pipeline: an ordered list of functions that
        comprise your pipeline
    '''
    return reduce(lambda v, f: f(v), function_pipeline, value)

z = pipeline(
    value=0,
    function_pipeline=(
        add_5,
        add_6,
        add_7,
    )
)

print(f'z={z}')

This solution is elegant and explicit. It is generic and works with mypy. As long as our function pipeline contains only functions that take our value’s type and return our value’s type (similar to Unix command line utilities, where everything is a file), this pipeline will successfully pass our value in a type-safe way.

Full script

#!/usr/bin/env python
'''Simple function example'''

def add_5(value: int) -> int:
    return value + 5

def add_6(value: int) -> int:
    return value + 6

def add_7(value: int) -> int:
    return value + 7

x_0 = 0
x_5 = add_5(x_0)
x_6 = add_6(x_5)
x = add_7(x_6)

print(f'x={x}')

from functools import reduce
y = reduce(
    lambda value, function: function(value),
    (
        add_5,
        add_6,
        add_7,
    ),
    0,
)

print(f'y={y}')

from typing import TypeVar, Callable, Sequence

T = TypeVar('T')

def pipeline(
        value: T,
        function_pipeline: Sequence[Callable[[T], T]],
) -> T:
    '''A generic Unix-like pipeline

    :param value: the value you want to pass through a pipeline
    :param function_pipeline: an ordered list of functions that
        comprise your pipeline
    '''
    return reduce(lambda v, f: f(v), function_pipeline, value)

z = pipeline(
    value=0,
    function_pipeline=(
        add_5,
        add_6,
        add_7,
    )
)

print(f'z={z}')

Conclusion

If your system resembles a pipeline, don’t reassign your piped variable to itself. There is a better way and it’s pretty much built into Python. You’ll just need to care enough to use it.