API

class towhee.runtime.pipeline.Pipeline(dag, clo_node='_input')[source]

Bases: object

Pipeline is a tool to create data transformation chains.

Parameters:
  • dag (dict) – The dag for the pipeline.

  • clo_node (str) – The close node in the pipeline dag, defaults to ‘_input’.

classmethod input(*schema) Pipeline[source]

Start a new pipeline chain.

Parameters:

schema (list) – The schema for the values being inputted into the pipeline.

Returns:

Pipeline ready to be chained.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = pipe.input('a', 'b', 'c')
output(*output_schema) RuntimePipeline[source]

Close and preload the pipeline, and ready to run with it.

Parameters:
  • output_schema (tuple) – Which columns to output.

  • config_kws (dict) – The config for this pipeline.

Returns:

The runtime pipeline that can be called on inputs.

Return type:

RuntimePipeline

Examples

>>> from towhee import pipe
>>> p = pipe.input('a').map('a', 'b', lambda x: x+1).output('b')
>>> p(1).get()
[2]
map(input_schema, output_schema, fn, config=None) Pipeline[source]

One to one map of function on inputs.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema.

  • config (dict, optional) – Config for the map. Defaults to None.

Returns:

Pipeline with action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = pipe.input('a').map('a', 'b', lambda x: x+1).output('a', 'b')
>>> p(1).get()
[1, 2]
concat(*pipes: Pipeline) Pipeline[source]

Concat a pipeline to another pipeline/s.

Parameters:

pipes – one or more pipelines to concat.

Returns:

Pipeline to be concated.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p0 = pipe.input('a', 'b', 'c')
>>> p1 = p0.map('a', 'd', lambda x: x+1)
>>> p2 = p0.map(('b', 'c'), 'e', lambda x, y: x - y)
>>> p3 = p2.concat(p1).output('d', 'e')
>>> p3(1, 2, 3).get()
[2, -1]
flat_map(input_schema, output_schema, fn, config=None) Pipeline[source]

One to many map action.

The operator might have a variable amount of outputs, each output is treated as a new row.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema.

  • config (dict, optional) – Config for the flat_map. Defaults to None.

Returns:

Pipeline with flat_map action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = (pipe.input('a')
...         .flat_map('a', 'b', lambda x: [y for y in x])
...         .output('b'))
>>> res = p([1, 2, 3])
>>> res.get()
[1]
>>> res.get()
[2]
>>> res.get()
[3]
filter(input_schema, output_schema, filter_columns, fn, config=None) Pipeline[source]

Filter the input columns based on the selected filter_columns.

Parameters:
  • input_schema (tuple) – The input column/s before filter.

  • output_schema (tuple) – The output columns after filter, so the length of input_schema equals to the output_schema.

  • filter_columns (str | tuple) – Which columns to filter on.

  • fn (Operation | lambda | callable) – The action to perform on the filter_colums.

  • config (dict, optional) – Config for the filter. Defaults to None.

Returns:

Pipeline with filter action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> def filter_func(num):
...     return num > 10
>>> p = (pipe.input('a', 'c')
...         .filter('c', 'd', 'a', filter_func)
...         .output('d'))
>>> p(1, 12).get()
None
>>> p(11, 12).get()
[12]
window(input_schema, output_schema, size, step, fn, config=None) Pipeline[source]

Window execution of action.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • size (int) – How many rows per window.

  • step (int) – How many rows to iterate after each window.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window.

  • config (dict, optional) – Config for the window map. Defaults to None

Returns:

Pipeline with window action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = (pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .window(('n1', 'n2'), ('s1', 's2'), 2, 1, lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> res = p([1, 2, 3, 4], [2, 3, 4, 5])
>>> res.get()
[3, 5]
>>> res.get()
[5, 7]
window_all(input_schema, output_schema, fn, config=None) Pipeline[source]

Read all rows as single window and perform action.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window all data.

  • config (dict, optional) – Config for the window_all. Defaults to None

Returns:

Pipeline with window_all action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = (pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .window_all(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> p([1, 2, 3, 4], [2, 3, 4, 5]).get()
[10, 14]
reduce(input_schema, output_schema, fn, config=None) Pipeline[source]

Reduce the sequence to a single value.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output column/s of fn.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window all data.

  • config (dict, optional) – Config for the window_all. Defaults to None

Returns:

Pipeline with reduce action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = (pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .reduce(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> p([1, 2, 3, 4], [2, 3, 4, 5]).get()
[10, 14]
time_window(input_schema, output_schema, timestamp_col, size, step, fn, config=None) Pipeline[source]

Perform action on time windows.

Parameters:
  • input_schema (tuple) – The input column/s of fn.

  • output_schema (tuple) – The output columns to fn.

  • timestamp_col (str) – Which column to use for creating windows.

  • size (int) – size of window.

  • step (int) – how far to progress window.

  • fn (Operation | lambda | callable) – The action to perform on the input_schema after window the date with timestamp_col.

  • config (dict, optional) – Config for the time window. Defaults to None.

Returns:

Pipeline with time_window action added.

Return type:

Pipeline

Examples

>>> from towhee import pipe
>>> p = (pipe.input('d')
...         .flat_map('d', ('n1', 'n2', 't'), lambda x: ((a, b, c) for a, b, c in x))
...         .time_window(('n1', 'n2'), ('s1', 's2'), 't', 3, 3, lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> data = [(i, i+1, i * 1000) for i in range(11) if i < 3 or i > 7] #[(0, 1), (1, 2), (2, 3), (8, 9), (9, 10), (10, 11)]
>>> res = p(data)
>>> res.get() #[(0, 1), (1, 2), (2, 3)]
[3, 6]
>>> res.get() #(8, 9)
[8, 9]
>>> res.get() #(9, 10), (10, 11)
[19, 21]
class towhee.AutoConfig[source]

Bases: object

Auto configuration.

static load_config(name: str, *args, **kwargs)[source]

Load config from pre-defined pipeline.

Examples

>>> from towhee import AutoConfig
>>> config = AutoConfig.load_config('sentence_embedding')
SentenceSimilarityConfig(model='all-MiniLM-L6-v2', openai_api_key=None, customize_embedding_op=None, normalize_vec=True, device=-1)
static LocalCPUConfig()[source]

Auto configuration to run with local CPU.

Examples

>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('a')
...          .flat_map('a', 'b', lambda x: [y for y in x], config=AutoConfig.LocalCPUConfig())
...          .output('b'))
static LocalGPUConfig(device: int = 0)[source]

Auto configuration to run with local GPU.

Parameters:

device (int) – the number of GPU device, defaults to 0.

Examples

>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.LocalGPUConfig())
...          .output('vec')
... )
static TritonCPUConfig(num_instances_per_device: int = 1, max_batch_size: Optional[int] = None, batch_latency_micros: Optional[int] = None, preferred_batch_size: Optional[list] = None)[source]

Auto configuration to run with triton server(CPU).

Parameters:
  • max_batch_size (int) – maximum batch size, defaults to None, and it will be auto-generated by triton.

  • batch_latency_micros (int) – time to the request, in microseconds, defaults to None, and it will auto-generated by triton.

  • num_instances_per_device (int) – the number of instances per device, defaults to 1.

  • preferred_batch_size (list) – preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.

Examples

>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonCPUConfig())
...          .output('vec')
... )

You can also to set the configuration:

>>> from towhee import pipe, AutoConfig
>>> config = AutoConfig.TritonCPUConfig(num_instances_per_device=3,
...                                     max_batch_size=128,
...                                     batch_latency_micros=100000,
...                                     preferred_batch_size=[8, 16])
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config)
...          .output('vec')
... )
static TritonGPUConfig(device_ids: Optional[list] = None, num_instances_per_device: int = 1, max_batch_size: Optional[int] = None, batch_latency_micros: Optional[int] = None, preferred_batch_size: Optional[list] = None)[source]

Auto configuration to run with triton server(GPUs).

Parameters:
  • device_ids (list) – list of GPUs, defaults to [0].

  • max_batch_size (int) – maximum batch size, defaults to None, and it will be auto-generated by triton.

  • batch_latency_micros (int) – time to the request, in microseconds, defaults to None, and it will auto-generated by triton.

  • num_instances_per_device (int) – the number of instances per device, defaults to 1.

  • preferred_batch_size (list) – preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.

Examples

>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonGPUConfig())
...          .output('vec')
... )

You can also to set the configuration:

>>> from towhee import pipe, AutoConfig
>>> config = AutoConfig.TritonGPUConfig(device_ids=[0, 1],
...                                     num_instances_per_device=3,
...                                     max_batch_size=128,
...                                     batch_latency_micros=100000,
...                                     preferred_batch_size=[8, 16])
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config)
...          .output('vec')
... )
class towhee.AutoPipes[source]

Bases: object

Load pre-defined pipelines. Some available pipelines are under [towhee/pipelines](https://github.com/towhee-io/towhee/tree/main/towhee/pipelines). And also put the predefined pipelines on the [Towhee Hub](https://towhee.io/).

Examples

>>> from towhee import AutoPipes, AutoConfig
>>> # config for sentence_embedding pipeline
>>> config = AutoConfig.load_config('sentence_embedding')
>>> config.model = 'all-MiniLM-L6-v2'
>>> embed_pipe = AutoPipes.pipeline('sentence_embedding', config=config)
>>> print(embed_pipe('How are you?').to_list())