API
- class towhee.runtime.pipeline.Pipeline(dag, clo_node='_input')[source]
Bases:
object
Pipeline is a tool to create data transformation chains.
- Args:
dag (dict): The dag for the pipeline. clo_node (str): The close node in the pipeline dag, defaults to ‘_input’.
- classmethod input(*schema) Pipeline [source]
Start a new pipeline chain.
- Args:
schema (list): The schema for the values being inputted into the pipeline.
- Returns:
Pipeline: Pipeline ready to be chained.
- Examples:
>>> from towhee import pipe >>> p = pipe.input('a', 'b', 'c')
- output(*output_schema) RuntimePipeline [source]
Close and preload the pipeline, and ready to run with it.
- Args:
output_schema (tuple): Which columns to output. config_kws (dict): The config for this pipeline.
- Returns:
RuntimePipeline: The runtime pipeline that can be called on inputs.
- Examples:
>>> from towhee import pipe >>> p = pipe.input('a').map('a', 'b', lambda x: x+1).output('b') >>> p(1).get() [2]
- map(input_schema, output_schema, fn, config=None) Pipeline [source]
One to one map of function on inputs.
- Args:
input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema. config (dict, optional): Config for the map. Defaults to None.
- Returns:
Pipeline: Pipeline with action added.
- Examples:
>>> from towhee import pipe >>> p = pipe.input('a').map('a', 'b', lambda x: x+1).output('a', 'b') >>> p(1).get() [1, 2]
- concat(*pipes: Pipeline) Pipeline [source]
Concat a pipeline to another pipeline/s.
- Args:
pipes : one or more pipelines to concat.
- Returns:
Pipeline: Pipeline to be concated.
- Examples:
>>> from towhee import pipe >>> p0 = pipe.input('a', 'b', 'c') >>> p1 = p0.map('a', 'd', lambda x: x+1) >>> p2 = p0.map(('b', 'c'), 'e', lambda x, y: x - y) >>> p3 = p2.concat(p1).output('d', 'e') >>> p3(1, 2, 3).get() [2, -1]
- flat_map(input_schema, output_schema, fn, config=None) Pipeline [source]
One to many map action.
The operator might have a variable amount of outputs, each output is treated as a new row.
- Args:
input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema. config (dict, optional): Config for the flat_map. Defaults to None.
- Returns:
Pipeline: Pipeline with flat_map action added.
- Examples:
>>> from towhee import pipe >>> p = (pipe.input('a') ... .flat_map('a', 'b', lambda x: [y for y in x]) ... .output('b')) >>> res = p([1, 2, 3]) >>> res.get() [1] >>> res.get() [2] >>> res.get() [3]
- filter(input_schema, output_schema, filter_columns, fn, config=None) Pipeline [source]
Filter the input columns based on the selected filter_columns.
- Args:
input_schema (tuple): The input column/s before filter. output_schema (tuple): The output columns after filter, so the length of input_schema equals to the output_schema. filter_columns (str | tuple): Which columns to filter on. fn (Operation | lambda | callable): The action to perform on the filter_colums. config (dict, optional): Config for the filter. Defaults to None.
- Returns:
Pipeline: Pipeline with filter action added.
- Examples:
>>> from towhee import pipe >>> def filter_func(num): ... return num > 10 >>> p = (pipe.input('a', 'c') ... .filter('c', 'd', 'a', filter_func) ... .output('d')) >>> p(1, 12).get() None >>> p(11, 12).get() [12]
- window(input_schema, output_schema, size, step, fn, config=None) Pipeline [source]
Window execution of action.
- Args:
input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. size (int): How many rows per window. step (int): How many rows to iterate after each window. fn (Operation | lambda | callable): The action to perform on the input_schema after window. config (dict, optional): Config for the window map. Defaults to None
- Returns:
Pipeline: Pipeline with window action added.
- Examples:
>>> from towhee import pipe >>> p = (pipe.input('n1', 'n2') ... .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)]) ... .window(('n1', 'n2'), ('s1', 's2'), 2, 1, lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> res = p([1, 2, 3, 4], [2, 3, 4, 5]) >>> res.get() [3, 5] >>> res.get() [5, 7]
- window_all(input_schema, output_schema, fn, config=None) Pipeline [source]
Read all rows as single window and perform action.
- Args:
input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema after window all data. config (dict, optional): Config for the window_all. Defaults to None
- Returns:
Pipeline: Pipeline with window_all action added.
- Examples:
>>> from towhee import pipe >>> p = (pipe.input('n1', 'n2') ... .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)]) ... .window_all(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> p([1, 2, 3, 4], [2, 3, 4, 5]).get() [10, 14]
- reduce(input_schema, output_schema, fn, config=None) Pipeline [source]
Reduce the sequence to a single value.
- Args:
input_schema (tuple): The input column/s of fn. output_schema (tuple): The output column/s of fn. fn (Operation | lambda | callable): The action to perform on the input_schema after window all data. config (dict, optional): Config for the window_all. Defaults to None
- Returns:
Pipeline: Pipeline with reduce action added.
- Examples:
>>> from towhee import pipe >>> p = (pipe.input('n1', 'n2') ... .flat_map(('n1', 'n2'), ('n1', 'n2'), lambda x, y: [(a, b) for a, b in zip(x, y)]) ... .reduce(('n1', 'n2'), ('s1', 's2'), lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> p([1, 2, 3, 4], [2, 3, 4, 5]).get() [10, 14]
- time_window(input_schema, output_schema, timestamp_col, size, step, fn, config=None) Pipeline [source]
Perform action on time windows.
- Args:
input_schema (tuple): The input column/s of fn. output_schema (tuple): The output columns to fn. timestamp_col (str): Which column to use for creating windows. size (int): size of window. step (int): how far to progress window. fn (Operation | lambda | callable): The action to perform on the input_schema
after window the date with timestamp_col.
config (dict, optional): Config for the time window. Defaults to None.
- Returns:
Pipeline: Pipeline with time_window action added.
- Examples:
>>> from towhee import pipe >>> p = (pipe.input('d') ... .flat_map('d', ('n1', 'n2', 't'), lambda x: ((a, b, c) for a, b, c in x)) ... .time_window(('n1', 'n2'), ('s1', 's2'), 't', 3, 3, lambda x, y: (sum(x), sum(y))) ... .output('s1', 's2')) >>> data = [(i, i+1, i * 1000) for i in range(11) if i < 3 or i > 7] #[(0, 1), (1, 2), (2, 3), (8, 9), (9, 10), (10, 11)] >>> res = p(data) >>> res.get() #[(0, 1), (1, 2), (2, 3)] [3, 6] >>> res.get() #(8, 9) [8, 9] >>> res.get() #(9, 10), (10, 11) [19, 21]
- class towhee.AutoConfig[source]
Bases:
object
Auto configuration.
- static load_config(name: str, *args, **kwargs)[source]
Load config from pre-defined pipeline.
- Examples:
>>> from towhee import AutoConfig >>> config = AutoConfig.load_config('sentence_embedding') SentenceSimilarityConfig(model='all-MiniLM-L6-v2', openai_api_key=None, customize_embedding_op=None, normalize_vec=True, device=-1)
- static LocalCPUConfig()[source]
Auto configuration to run with local CPU.
- Examples:
>>> from towhee import pipe, AutoConfig >>> p = (pipe.input('a') ... .flat_map('a', 'b', lambda x: [y for y in x], config=AutoConfig.LocalCPUConfig()) ... .output('b'))
- static LocalGPUConfig(device: int = 0)[source]
Auto configuration to run with local GPU.
- Args:
device (int): the number of GPU device, defaults to 0.
- Examples:
>>> from towhee import pipe, AutoConfig >>> p = (pipe.input('url') ... .map('url', 'image', ops.image_decode.cv2()) ... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.LocalGPUConfig()) ... .output('vec') ... )
- static TritonCPUConfig(num_instances_per_device: int = 1, max_batch_size: int | None = None, batch_latency_micros: int | None = None, preferred_batch_size: list | None = None)[source]
Auto configuration to run with triton server(CPU).
- Args:
- max_batch_size(int):
maximum batch size, defaults to None, and it will be auto-generated by triton.
- batch_latency_micros(int):
time to the request, in microseconds, defaults to None, and it will auto-generated by triton.
- num_instances_per_device(int):
the number of instances per device, defaults to 1.
- preferred_batch_size(list):
preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.
- Examples:
>>> from towhee import pipe, AutoConfig >>> p = (pipe.input('url') ... .map('url', 'image', ops.image_decode.cv2()) ... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonCPUConfig()) ... .output('vec') ... )
You can also to set the configuration:
>>> from towhee import pipe, AutoConfig >>> config = AutoConfig.TritonCPUConfig(num_instances_per_device=3, ... max_batch_size=128, ... batch_latency_micros=100000, ... preferred_batch_size=[8, 16]) >>> p = (pipe.input('url') ... .map('url', 'image', ops.image_decode.cv2()) ... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config) ... .output('vec') ... )
- static TritonGPUConfig(device_ids: list | None = None, num_instances_per_device: int = 1, max_batch_size: int | None = None, batch_latency_micros: int | None = None, preferred_batch_size: list | None = None)[source]
Auto configuration to run with triton server(GPUs).
- Args:
- device_ids(list):
list of GPUs, defaults to [0].
- max_batch_size(int):
maximum batch size, defaults to None, and it will be auto-generated by triton.
- batch_latency_micros(int):
time to the request, in microseconds, defaults to None, and it will auto-generated by triton.
- num_instances_per_device(int):
the number of instances per device, defaults to 1.
- preferred_batch_size(list):
preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.
- Examples:
>>> from towhee import pipe, AutoConfig >>> p = (pipe.input('url') ... .map('url', 'image', ops.image_decode.cv2()) ... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonGPUConfig()) ... .output('vec') ... )
You can also to set the configuration:
>>> from towhee import pipe, AutoConfig >>> config = AutoConfig.TritonGPUConfig(device_ids=[0, 1], ... num_instances_per_device=3, ... max_batch_size=128, ... batch_latency_micros=100000, ... preferred_batch_size=[8, 16]) >>> p = (pipe.input('url') ... .map('url', 'image', ops.image_decode.cv2()) ... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config) ... .output('vec') ... )
- class towhee.AutoPipes[source]
Bases:
object
Load pre-defined pipelines. Some available pipelines are under [towhee/pipelines](https://github.com/towhee-io/towhee/tree/main/towhee/pipelines). And also put the predefined pipelines on the [Towhee Hub](https://towhee.io/).
- Examples:
>>> from towhee import AutoPipes, AutoConfig >>> # config for sentence_embedding pipeline >>> config = AutoConfig.load_config('sentence_embedding') >>> config.model = 'all-MiniLM-L6-v2' >>> embed_pipe = AutoPipes.pipeline('sentence_embedding', config=config) >>> print(embed_pipe('How are you?').to_list())