towhee.runtime.pipeline.Pipeline¶
- class towhee.runtime.pipeline.Pipeline(dag, clo_node='_input')[source]¶
Bases:
object
Pipeline is a tool to create data transformation chains.
- Parameters:
dag (dict) – The dag for the pipeline.
clo_node (str) – The close node in the pipeline dag, defaults to ‘_input’.
Methods
Concat a pipeline to another pipeline/s.
Filter the input columns based on the selected filter_columns.
One to many map action.
Start a new pipeline chain.
One to one map of function on inputs.
Close and preload the pipeline, and ready to run with it.
Perform action on time windows.
Window execution of action.
Read all rows as single window and perform action.
Attributes
dag
- concat(*pipes: Pipeline) Pipeline [source]¶
Concat a pipeline to another pipeline/s.
- Parameters:
pipes – one or more pipelines to concat.
- Returns:
Pipeline to be concated.
- Return type:
Examples
>>> pipe0 = towhee.pipe.input('a', 'b', 'c') >>> pipe1 = pipe0.map('a', 'd', lambda x: x+1) >>> pipe2 = pipe0.map(('b', 'c'), 'e', lambda x, y: x - y) >>> pipe3 = pipe2.concat(pipe1).output('d', 'e') >>> pipe3(1, 2, 3).get() [2, -1]
- filter(input_schema, output_schema, filter_columns, fn, config=None) Pipeline [source]¶
Filter the input columns based on the selected filter_columns.
- Parameters:
input_schema (tuple) – The input column/s before filter.
output_schema (tuple) – The output columns after filter, so the length of input_schema equals to the output_schema.
filter_columns (str | tuple) – Which columns to filter on.
fn (Operation | lambda | callable) – The action to perform on the filter_colums.
config (dict, optional) – Config for the filter. Defaults to None.
- Returns:
Pipeline with filter action added.
- Return type:
Examples
>>> import towhee >>> def filter_func(num): ... return num > 10 >>> pipe = (towhee.pipe.input('a', 'c') ... .filter('c', 'd', 'a', filter_func) ... .output('d')) >>> pipe(1, 12).get() None >>> pipe(11, 12).get() [12]
- flat_map(input_schema, output_schema, fn, config=None) Pipeline [source]¶
One to many map action.
The operator might have a variable amount of outputs, each output is treated as a new row.
- Parameters:
input_schema (tuple) – The input column/s of fn.
output_schema (tuple) – The output column/s of fn.
fn (Operation | lambda | callable) – The action to perform on the input_schema.
config (dict, optional) – Config for the flat_map. Defaults to None.
- Returns:
Pipeline with flat_map action added.
- Return type:
Examples
>>> import towhee >>> pipe = (towhee.pipe.input('a') ... .flat_map('a', 'b', lambda x: [y for y in x]) ... .output('b')) >>> res = pipe([1, 2, 3]) >>> res.get() [1] >>> res.get() [2] >>> res.get() [3]
- classmethod input(*schema) Pipeline [source]¶
Start a new pipeline chain.
- Parameters:
schema (list) – The schema for the values being inputted into the pipeline.
- Returns:
Pipeline ready to be chained.
- Return type:
Examples
>>> import towhee >>> pipe = towhee.pipe.input('a', 'b', 'c')
- map(input_schema, output_schema, fn, config=None) Pipeline [source]¶
One to one map of function on inputs.
- Parameters:
input_schema (tuple) – The input column/s of fn.
output_schema (tuple) – The output column/s of fn.
fn (Operation | lambda | callable) – The action to perform on the input_schema.
config (dict, optional) – Config for the map. Defaults to None.
- Returns:
Pipeline with action added.
- Return type:
Examples
>>> import towhee >>> pipe = towhee.pipe.input('a').map('a', 'b', lambda x: x+1).output('a', 'b') >>> pipe(1).get() [1, 2]
- output(*output_schema, **config_kws) RuntimePipeline [source]¶
Close and preload the pipeline, and ready to run with it.
- Parameters:
output_schema (tuple) – Which columns to output.
config_kws (dict) – The config for this pipeline.
- Returns:
The runtime pipeline that can be called on inputs.
- Return type:
Examples
>>> import towhee >>> pipe = towhee.pipe.input('a').map('a', 'b', lambda x: x+1).output('b') >>> pipe(1).get() [2]
- time_window(input_schema, output_schema, timestamp_col, size, step, fn, config=None) Pipeline [source]¶
Perform action on time windows.
- Parameters:
input_schema (tuple) – The input column/s of fn.
output_schema (tuple) – The output columns to fn.
timestamp_col (str) – Which column to use for creating windows.
size (int) – size of window.
step (int) – how far to progress window.
fn (Operation | lambda | callable) – The action to perform on the input_schema after window the date with timestamp_col.
config (dict, optional) – Config for the time window. Defaults to None.
- Returns:
Pipeline with time_window action added.
- Return type:
Examples
>>> import towhee >>> pipe = (towhee.pipe.input('d') ... .flat_map('d', ('n1', 'n2', 't'), lambda x: ((a, b, c) for a, b, c in x)) ... .time_window(('n1', 'n2'), ('s1', 's2'), 't', 3, 3, lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> data = [(i, i+1, i * 1000) for i in range(11) if i < 3 or i > 7] #[(0, 1), (1, 2), (2, 3), (8, 9), (9, 10), (10, 11)] >>> res = pipe(data) >>> res.get() #[(0, 1), (1, 2), (2, 3)] [3, 6] >>> res.get() #(8, 9) [8, 9] >>> res.get() #(9, 10), (10, 11) [19, 21]
- window(input_schema, output_schema, size, step, fn, config=None) Pipeline [source]¶
Window execution of action.
- Parameters:
input_schema (tuple) – The input column/s of fn.
output_schema (tuple) – The output column/s of fn.
size (int) – How many rows per window.
step (int) – How many rows to iterate after each window.
fn (Operation | lambda | callable) – The action to perform on the input_schema after window.
config (dict, optional) – Config for the window map. Defaults to None
- Returns:
Pipeline with window action added.
- Return type:
Examples
>>> import towhee >>> pipe = (towhee.pipe.input('n1', 'n2') ... .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)]) ... .window(('n1', 'n2'), ('s1', 's2'), 2, 1, lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> res = pipe([1, 2, 3, 4], [2, 3, 4, 5]) >>> res.get() [3, 5] >>> res.get() [5, 7]
- window_all(input_schema, output_schema, fn, config=None) Pipeline [source]¶
Read all rows as single window and perform action.
- Parameters:
input_schema (tuple) – The input column/s of fn.
output_schema (tuple) – The output column/s of fn.
fn (Operation | lambda | callable) – The action to perform on the input_schema after window all data.
config (dict, optional) – Config for the window_all. Defaults to None
- Returns:
Pipeline with window_all action added.
- Return type:
Examples
>>> import towhee >>> pipe = (towhee.pipe.input('n1', 'n2') ... .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)]) ... .window_all(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> pipe([1, 2, 3, 4], [2, 3, 4, 5]).get() [10, 14]