API

class towhee.runtime.pipeline.Pipeline(dag, clo_node='_input')[source]

Bases: object

Pipeline is a tool to create data transformation chains.

Args:

dag (dict): The dag for the pipeline. clo_node (str): The close node in the pipeline dag, defaults to ‘_input’.

classmethod input(*schema) Pipeline[source]

Start a new pipeline chain.

Args:

schema (list): The schema for the values being inputted into the pipeline.

Returns:

Pipeline: Pipeline ready to be chained.

Examples:
>>> from towhee import pipe
>>> p = pipe.input('a', 'b', 'c')
output(*output_schema) RuntimePipeline[source]

Close and preload the pipeline, and ready to run with it.

Args:

output_schema (tuple): Which columns to output. config_kws (dict): The config for this pipeline.

Returns:

RuntimePipeline: The runtime pipeline that can be called on inputs.

Examples:
>>> from towhee import pipe
>>> p = pipe.input('a').map('a', 'b', lambda x: x+1).output('b')
>>> p(1).get()
[2]
map(input_schema, output_schema, fn, config=None) Pipeline[source]

One to one map of function on inputs.

Args:

input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema. config (dict, optional): Config for the map. Defaults to None.

Returns:

Pipeline: Pipeline with action added.

Examples:
>>> from towhee import pipe
>>> p = pipe.input('a').map('a', 'b', lambda x: x+1).output('a', 'b')
>>> p(1).get()
[1, 2]
concat(*pipes: Pipeline) Pipeline[source]

Concat a pipeline to another pipeline/s.

Args:

pipes : one or more pipelines to concat.

Returns:

Pipeline: Pipeline to be concated.

Examples:
>>> from towhee import pipe
>>> p0 = pipe.input('a', 'b', 'c')
>>> p1 = p0.map('a', 'd', lambda x: x+1)
>>> p2 = p0.map(('b', 'c'), 'e', lambda x, y: x - y)
>>> p3 = p2.concat(p1).output('d', 'e')
>>> p3(1, 2, 3).get()
[2, -1]
flat_map(input_schema, output_schema, fn, config=None) Pipeline[source]

One to many map action.

The operator might have a variable amount of outputs, each output is treated as a new row.

Args:

input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema. config (dict, optional): Config for the flat_map. Defaults to None.

Returns:

Pipeline: Pipeline with flat_map action added.

Examples:
>>> from towhee import pipe
>>> p = (pipe.input('a')
...         .flat_map('a', 'b', lambda x: [y for y in x])
...         .output('b'))
>>> res = p([1, 2, 3])
>>> res.get()
[1]
>>> res.get()
[2]
>>> res.get()
[3]
filter(input_schema, output_schema, filter_columns, fn, config=None) Pipeline[source]

Filter the input columns based on the selected filter_columns.

Args:

input_schema (tuple): The input column/s before filter. output_schema (tuple): The output columns after filter, so the length of input_schema equals to the output_schema. filter_columns (str | tuple): Which columns to filter on. fn (Operation | lambda | callable): The action to perform on the filter_colums. config (dict, optional): Config for the filter. Defaults to None.

Returns:

Pipeline: Pipeline with filter action added.

Examples:
>>> from towhee import pipe
>>> def filter_func(num):
...     return num > 10
>>> p = (pipe.input('a', 'c')
...         .filter('c', 'd', 'a', filter_func)
...         .output('d'))
>>> p(1, 12).get()
None
>>> p(11, 12).get()
[12]
window(input_schema, output_schema, size, step, fn, config=None) Pipeline[source]

Window execution of action.

Args:

input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. size (int): How many rows per window. step (int): How many rows to iterate after each window. fn (Operation | lambda | callable): The action to perform on the input_schema after window. config (dict, optional): Config for the window map. Defaults to None

Returns:

Pipeline: Pipeline with window action added.

Examples:
>>> from towhee import pipe
>>> p = (pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .window(('n1', 'n2'), ('s1', 's2'), 2, 1, lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> res = p([1, 2, 3, 4], [2, 3, 4, 5])
>>> res.get()
[3, 5]
>>> res.get()
[5, 7]
window_all(input_schema, output_schema, fn, config=None) Pipeline[source]

Read all rows as single window and perform action.

Args:

input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema after window all data. config (dict, optional): Config for the window_all. Defaults to None

Returns:

Pipeline: Pipeline with window_all action added.

Examples:
>>> from towhee import pipe
>>> p = (pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .window_all(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> p([1, 2, 3, 4], [2, 3, 4, 5]).get()
[10, 14]
reduce(input_schema, output_schema, fn, config=None) Pipeline[source]

Reduce the sequence to a single value.

Args:

input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema after window all data. config (dict, optional): Config for the window_all. Defaults to None

Returns:

Pipeline: Pipeline with reduce action added.

Examples:
>>> from towhee import pipe
>>> p = (pipe.input('n1', 'n2')
...         .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)])
...         .reduce(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> p([1, 2, 3, 4], [2, 3, 4, 5]).get()
[10, 14]
time_window(input_schema, output_schema, timestamp_col, size, step, fn, config=None) Pipeline[source]

Perform action on time windows.

Args:

input_schema (tuple): The input column/s of fn. output_schema (tuple): The output columns to fn. timestamp_col (str): Which column to use for creating windows. size (int): size of window. step (int): how far to progress window. fn (Operation | lambda | callable): The action to perform on the input_schema

after window the date with timestamp_col.

config (dict, optional): Config for the time window. Defaults to None.

Returns:

Pipeline: Pipeline with time_window action added.

Examples:
>>> from towhee import pipe
>>> p = (pipe.input('d')
...         .flat_map('d', ('n1', 'n2', 't'), lambda x: ((a, b, c) for a, b, c in x))
...         .time_window(('n1', 'n2'), ('s1', 's2'), 't', 3, 3, lambda x, y: (sum(x), sum(y)))
...         .output('s1', 's2'))
>>> data = [(i, i+1, i * 1000) for i in range(11) if i < 3 or i > 7] #[(0, 1), (1, 2), (2, 3), (8, 9), (9, 10), (10, 11)]
>>> res = p(data)
>>> res.get() #[(0, 1), (1, 2), (2, 3)]
[3, 6]
>>> res.get() #(8, 9)
[8, 9]
>>> res.get() #(9, 10), (10, 11)
[19, 21]
class towhee.AutoConfig[source]

Bases: object

Auto configuration.

static load_config(name: str, *args, **kwargs)[source]

Load config from pre-defined pipeline.

Examples:
>>> from towhee import AutoConfig
>>> config = AutoConfig.load_config('sentence_embedding')
SentenceSimilarityConfig(model='all-MiniLM-L6-v2', openai_api_key=None, customize_embedding_op=None, normalize_vec=True, device=-1)
static LocalCPUConfig()[source]

Auto configuration to run with local CPU.

Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('a')
...          .flat_map('a', 'b', lambda x: [y for y in x], config=AutoConfig.LocalCPUConfig())
...          .output('b'))
static LocalGPUConfig(device: int = 0)[source]

Auto configuration to run with local GPU.

Args:

device (int): the number of GPU device, defaults to 0.

Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.LocalGPUConfig())
...          .output('vec')
... )
static TritonCPUConfig(num_instances_per_device: int = 1, max_batch_size: int | None = None, batch_latency_micros: int | None = None, preferred_batch_size: list | None = None)[source]

Auto configuration to run with triton server(CPU).

Args:
max_batch_size(int):

maximum batch size, defaults to None, and it will be auto-generated by triton.

batch_latency_micros(int):

time to the request, in microseconds, defaults to None, and it will auto-generated by triton.

num_instances_per_device(int):

the number of instances per device, defaults to 1.

preferred_batch_size(list):

preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.

Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonCPUConfig())
...          .output('vec')
... )

You can also to set the configuration:

>>> from towhee import pipe, AutoConfig
>>> config = AutoConfig.TritonCPUConfig(num_instances_per_device=3,
...                                     max_batch_size=128,
...                                     batch_latency_micros=100000,
...                                     preferred_batch_size=[8, 16])
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config)
...          .output('vec')
... )
static TritonGPUConfig(device_ids: list | None = None, num_instances_per_device: int = 1, max_batch_size: int | None = None, batch_latency_micros: int | None = None, preferred_batch_size: list | None = None)[source]

Auto configuration to run with triton server(GPUs).

Args:
device_ids(list):

list of GPUs, defaults to [0].

max_batch_size(int):

maximum batch size, defaults to None, and it will be auto-generated by triton.

batch_latency_micros(int):

time to the request, in microseconds, defaults to None, and it will auto-generated by triton.

num_instances_per_device(int):

the number of instances per device, defaults to 1.

preferred_batch_size(list):

preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.

Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonGPUConfig())
...          .output('vec')
... )

You can also to set the configuration:

>>> from towhee import pipe, AutoConfig
>>> config = AutoConfig.TritonGPUConfig(device_ids=[0, 1],
...                                     num_instances_per_device=3,
...                                     max_batch_size=128,
...                                     batch_latency_micros=100000,
...                                     preferred_batch_size=[8, 16])
>>> p = (pipe.input('url')
...          .map('url', 'image', ops.image_decode.cv2())
...          .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config)
...          .output('vec')
... )
class towhee.AutoPipes[source]

Bases: object

Load pre-defined pipelines. Some available pipelines are under [towhee/pipelines](https://github.com/towhee-io/towhee/tree/main/towhee/pipelines). And also put the predefined pipelines on the [Towhee Hub](https://towhee.io/).

Examples:
>>> from towhee import AutoPipes, AutoConfig
>>> # config for sentence_embedding pipeline
>>> config = AutoConfig.load_config('sentence_embedding')
>>> config.model = 'all-MiniLM-L6-v2'
>>> embed_pipe = AutoPipes.pipeline('sentence_embedding', config=config)
>>> print(embed_pipe('How are you?').to_list())