towhee.runtime.pipeline.Pipeline

class towhee.runtime.pipeline.Pipeline(dag, clo_node='_input')[source]

Bases: object

Pipeline is a tool to create data transformation chains.

Parameters:
  • dag (dict) – The dag for the pipeline.

  • clo_node (str) – The close node in the pipeline dag, defaults to ‘_input’.

Methods

concat

Concat a pipeline to another pipeline/s.

filter

Filter the input columns based on the selected filter_columns.

flat_map

One to many map action.

input

Start a new pipeline chain.

map

One to one map of function on inputs.

output

Close and preload the pipeline, and ready to run with it.

time_window

Perform action on time windows.

window

Window execution of action.

window_all

Read all rows as single window and perform action.

Attributes

dag

__init__(dag, clo_node='_input')[source]
concat(*pipes: Pipeline) Pipeline[source]

Concat a pipeline to another pipeline/s.

Parameters:

pipes – one or more pipelines to concat.

Returns:

Pipeline to be concated.

Return type:

Pipeline

Examples

>>> pipe0 = towhee.pipe.input('a', 'b', 'c')
>>> pipe1 = pipe0.map('a', 'd', lambda x: x+1)
>>> pipe2 = pipe0.map(('b', 'c'), 'e', lambda x, y: x - y)
>>> pipe3 = pipe2.concat(pipe1).output('d', 'e')
>>> pipe3(1, 2, 3).get()
[2, -1]
filter(input_schema, output_schema, filter_columns, fn, config=None) Pipeline[source]

Filter the input columns based on the selected filter_columns.

Parameters:
  • input_schema (tuple) – The input column/s before filter.

  • output_schema (tuple) – The output columns after filter, so the length of input_schema equals to the output_schema.

  • filter_columns (str | tuple) – Which columns to filter on.

  • fn (Operation | lambda | callable) – The action to perform on the filter_colums.

  • config (dict, optional) – Config for the filter. Defaults to None.

Returns:

Pipeline with filter action added.

Return type:

Pipeline

Examples

>>> import towhee
>>> def filter_func(num):
...     return num > 10
>>> pipe = (towhee.pipe.input('a', 'c')
...         .filter('c', 'd', 'a', filter_func)
...         .output('d'))
>>> pipe(1, 12).get()
None
>>> pipe(11, 12).get()
[12]
flat_map(input_schema, output_schema, fn, config=None) Pipeline[source]

One to many map action.

The operator might have a variable amount of outputs, each output is treated as a new row.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema.

  • config (dict, optional) – Config for the flat_map. Defaults to None.

Returns:

Pipeline with flat_map action added.

Return type:

Pipeline

Examples

>>> import towhee
>>> pipe = (towhee.pipe.input('a')
...         .flat_map('a', 'b', lambda x: [y for y in x])
...         .output('b'))
>>> res = pipe([1, 2, 3])
>>> res.get()
[1]
>>> res.get()
[2]
>>> res.get()
[3]
classmethod input(*schema) Pipeline[source]

Start a new pipeline chain.

Parameters:

schema (list) – The schema for the values being inputted into the pipeline.

Returns:

Pipeline ready to be chained.

Return type:

Pipeline

Examples

>>> import towhee
>>> pipe = towhee.pipe.input('a', 'b', 'c')
map(input_schema, output_schema, fn, config=None) Pipeline[source]

One to one map of function on inputs.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema.

  • config (dict, optional) – Config for the map. Defaults to None.

Returns:

Pipeline with action added.

Return type:

Pipeline

Examples

>>> import towhee
>>> pipe = towhee.pipe.input('a').map('a', 'b', lambda x: x+1).output('a', 'b')
>>> pipe(1).get()
[1, 2]
output(*output_schema, **config_kws) RuntimePipeline[source]

Close and preload the pipeline, and ready to run with it.

Parameters:
  • output_schema (tuple) – Which columns to output.

  • config_kws (dict) – The config for this pipeline.

Returns:

The runtime pipeline that can be called on inputs.

Return type:

RuntimePipeline

Examples

>>> import towhee
>>> pipe = towhee.pipe.input('a').map('a', 'b', lambda x: x+1).output('b')
>>> pipe(1).get()
[2]
time_window(input_schema, output_schema, timestamp_col, size, step, fn, config=None) Pipeline[source]

Perform action on time windows.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output columns to fn.

  • timestamp_col (str) – Which column to use for creating windows.

  • size (int) – size of window.

  • step (int) – how far to progress window.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window the date with timestamp_col.

  • config (dict, optional) – Config for the time window. Defaults to None.

Returns:

Pipeline with time_window action added.

Return type:

Pipeline

Examples

>>> import towhee
>>> pipe = (towhee.pipe.input('d')
...         .flat_map('d', ('n1', 'n2', 't'), lambda x: ((a, b, c) for a, b, c in x))
...         .time_window(('n1', 'n2'), ('s1', 's2'), 't', 3, 3, lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> data = [(i, i+1, i * 1000) for i in range(11) if i < 3 or i > 7] #[(0, 1), (1, 2), (2, 3), (8, 9), (9, 10), (10, 11)]
>>> res = pipe(data)
>>> res.get() #[(0, 1), (1, 2), (2, 3)]
[3, 6]
>>> res.get() #(8, 9)
[8, 9]
>>> res.get() #(9, 10), (10, 11)
[19, 21]
window(input_schema, output_schema, size, step, fn, config=None) Pipeline[source]

Window execution of action.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • size (int) – How many rows per window.

  • step (int) – How many rows to iterate after each window.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window.

  • config (dict, optional) – Config for the window map. Defaults to None

Returns:

Pipeline with window action added.

Return type:

Pipeline

Examples

>>> import towhee
>>> pipe = (towhee.pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .window(('n1', 'n2'), ('s1', 's2'), 2, 1, lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> res = pipe([1, 2, 3, 4], [2, 3, 4, 5])
>>> res.get()
[3, 5]
>>> res.get()
[5, 7]
window_all(input_schema, output_schema, fn, config=None) Pipeline[source]

Read all rows as single window and perform action.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window all data.

  • config (dict, optional) – Config for the window_all. Defaults to None

Returns:

Pipeline with window_all action added.

Return type:

Pipeline

Examples

>>> import towhee
>>> pipe = (towhee.pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .window_all(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> pipe([1, 2, 3, 4], [2, 3, 4, 5]).get()
[10, 14]