DataCollection Mixins

class towhee.functional.mixins.DatasetMixin[source]

Bases: object

Mixin for dealing with dataset

classmethod from_glob(*args)[source]

generate a file list with pattern

classmethod read_zip(url, pattern, mode='r')[source]

load files from url/path.

Parameters:
  • zip_src (Union[str, path]) – The path leads to the image.

  • pattern (str) – The filename pattern to extract.

  • mode (str) – file open mode.

Returns:

The file handler for file in the zip file.

Return type:

(File)

to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')[source]

Save dc as a csv file.

Parameters:
  • csv_path (Union[str, Path]) – The path to save the dc to.

  • encoding (str) – The encoding to use in the output file.

split_train_test(size: list = [0.9, 0.1], **kws)[source]

Split DataCollection to train and test data.

Parameters:

size (list) – The size of the train and test.

Examples:

>>> from towhee.functional import DataCollection
>>> dc = DataCollection.range(10)
>>> train, test = dc.split_train_test(shuffle=False)
>>> train.to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> test.to_list()
[9]
class towhee.functional.mixins.DispatcherMixin[source]

Bases: object

Mixin for call dispatcher for data collection

>>> from towhee import register
>>> from towhee import ops
>>> from towhee import DataCollection
>>> @register(name='add_1')
... def add_1(x):
...     return x+1
>>> dc = DataCollection.range(5).stream()
>>> dc.add_1['a','b','c']() 
<map object at ...>
class towhee.functional.mixins.ParallelMixin[source]

Bases: object

Mixin for parallel execution.

Examples:

>>> from towhee import DataCollection
>>> def add_1(x):
...     return x+1
>>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list()
>>> len(result)
1000
>>> from towhee import dc
>>> dc = dc['a'](range(1000)).set_parallel(5)
>>> dc = dc.runas_op['a', 'b'](lambda x: x+1).to_list()
>>> len(dc)
1000
>>> from towhee import dc
>>> dc = dc['a'](range(1000)).set_parallel(5).set_chunksize(2)
>>> dc = dc.runas_op['a', 'b'](lambda x: x+1)
>>> dc._iterable.chunks()[:2]
[pyarrow.Table
a: int64
b: int64
----
a: [[0,1]]
b: [[1,2]], pyarrow.Table
a: int64
b: int64
----
a: [[2,3]]
b: [[3,4]]]
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list()
>>> result[990:]
[992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
set_parallel(num_worker=2, backend='thread')[source]

Set parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = set()
>>> stage_2_thread_set = set()
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_2_thread_set)>1
True
split(count)[source]

Split a dataframe into multiple dataframes.

Parameters:

count (int) – how many resulting DCs;

Returns:

copies of DC;

Return type:

[DataCollection, …]

Examples:

  1. Split:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> a, b, c = dc.split(3)
>>> a.zip(b, c).to_list()
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
pmap(unary_op, num_worker=None, backend=None)[source]

Apply unary_op with parallel execution. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream()
...     .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5)
...     .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list()
... )
>>> len(stage_1_thread_set) > 1
True
>>> len(stage_2_thread_set) > 1
True
mmap(ops: list, num_worker=None, backend=None)[source]

Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

# TODO: the test is broken with pytest # Examples:

# 1. Using mmap:

# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]

# 2. Using map instead of mmap:

# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]

# 3. DAG execution:

# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]

class towhee.functional.mixins.ComputerVisionMixin[source]

Bases: object

Mixin for computer vision problems.

classmethod read_camera(device_id=0, limit=-1)[source]

read images from a camera.

classmethod read_video(path, format='rgb24')[source]

Load video as a datacollection.

Parameters:
  • path – The path to the target video.

  • format – The format of the images loaded from video.

to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)[source]

Encode a video with audio if provided.

Parameters:
  • output_path – The path of the output video.

  • codec – The codec to encode and decode the video.

  • rate – The rate of the video.

  • width – The width of the video.

  • height – The height of the video.

  • format – The format of the video frame image.

  • template – The template video stream of the ouput video stream.

  • audio_src – The audio to encode with the video.

class towhee.functional.mixins.StateMixin[source]

Bases: object

Mixin for state tracking.

Examples:

>>> from towhee import DataCollection, State
>>> from towhee import param_scope
>>> dc = DataCollection.range(10).set_state(State(a=1))
>>> dc.get_state()
{'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2)
>>> dc.get_state()
{'a': 1}
get_state()[source]

Get the state storage for DataCollection

Returns:

the state storage

Return type:

State

set_state(state)[source]

Set the state storage for DataCollection

Parameters:

state (State) – state storage

Returns:

data collection itself

Return type:

DataCollection

set_training(state=None)[source]

Set training mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

set_evaluating(state=None)[source]

Set evaluating mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

class towhee.functional.mixins.MetricMixin[source]

Bases: object

Mixin for metric

report()[source]

report the metric scores

Examples:

>>> from towhee import DataCollection
>>> from towhee import Entity
>>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])])
>>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report()
    accuracy  recall
lr       1.0     1.0
rf       0.8     0.8
{'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])])
>>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report()
      mean_average_precision  mean_hit_ratio
test                0.622222             1.0
{'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
class towhee.functional.mixins.MilvusMixin[source]

Bases: object

Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.

Parameters:
  • collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.

  • batch (str) – The batch size to load into Milvus, defaults to 1.

  • stream (bool, optional) – Whther the stream mode with to_milvus, defaults to True.

Returns:

A MutationResult object contains insert_count represents how many and a primary_keys is a list of primary keys.

Examples:

Note

The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.

>>> import towhee
>>> from pymilvus import connections 
>>> mr = ( 
...     towhee.glob['path']('./*.jpg')
...           .image_decode['path', 'img']()
...           .image_embedding.timm['img', 'vec'](model_name='resnet50')
...           .to_milvus['vec'](collection='test', batch=1000)
... )
class towhee.functional.mixins.FaissMixin[source]

Bases: object

Mixins for Faiss, such as loading data into Faiss. And ids and vectors need to be passed as index. If ids is a string, KV storage will be started, and the kv data will be saved to the specified directory as “kv.bin”.

Parameters:
  • findex (str or faiss.INDEX, optional) – The path to faiss index file(defaults to ‘./index.bin’) or faiss index.

  • string (str, optional) – A string to produce a composite Faiss index, which is the same parameter in faiss.index_factor, defaults to ‘IDMap,Flat’, and you can refer to https://github.com/facebookresearch/faiss/wiki/The-index-factory.

  • metric (faiss.METRIC, optional) – The metric for Faiss index, defaults to faiss.METRIC_L2.

Returns:

A DC, and will save the Faiss index file and kv file(if ids is string).

Examples:

Note

Please make sure the path to index_file is authorized, and it will write the Faiss index file and kv file(if ids is string).

>>> import towhee
>>> dc = ( 
...     towhee.glob['path']('./*.jpg')
...           .image_decode['path', 'img']()
...           .image_embedding.timm['img', 'vec'](model_name='resnet50')
...           .to_faiss['path', 'vec'](findex='./faiss/faiss.index')
... )
class towhee.functional.mixins.ConfigMixin[source]

Bases: object

Mixin to config DC, such as set the parallel, chunksize, jit.

Examples:

>>> import towhee
>>> dc = towhee.dc['a'](range(20))
>>> dc = dc.set_chunksize(10)
>>> dc = dc.set_parallel(2)
>>> dc = dc.set_jit('numba')
>>> dc.get_config()
{'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None}
>>> dc1 = towhee.dc([1,2,3]).config(jit='numba')
>>> dc2 = towhee.dc['a'](range(40)).config(parallel=2, chunksize=20)
>>> dc1.get_config()
{'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None}
>>> dc2.get_config()
{'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None}
>>> dc3 = towhee.dc['a'](range(10)).config(format_priority=['tensorrt', 'onnx'])
>>> dc3.get_config()
{'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
>>> import towhee
>>> dc = towhee.dc['a'](range(20))
>>> dc = dc.set_chunksize(10)
>>> dc = dc.set_parallel(2)
>>> dc = dc.set_jit('numba')
>>> dc.get_pipeline_config()
{'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None}
>>> dc1 = towhee.dc([1,2,3]).pipeline_config(jit='numba')
>>> dc2 = towhee.dc['a'](range(40)).pipeline_config(parallel=2, chunksize=20)
>>> dc1.get_pipeline_config()
{'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None}
>>> dc2.get_pipeline_config()
{'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None}
>>> dc3 = towhee.dc['a'](range(10)).pipeline_config(format_priority=['tensorrt', 'onnx'])
>>> dc3.get_pipeline_config()
{'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]

Set the parameters in DC.

Parameters:
  • parallel (int) – Set the number of parallel execution for following calls.

  • chunksize (int) – Set the chunk size for arrow.

  • jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.

  • format_priority (List[str]) – The priority list of format.

get_config()[source]

Return the config in DC, such as parallel, chunksize, jit and format_priority.

pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]

Set the parameters in DC.

Parameters:
  • parallel (int) – Set the number of parallel execution for following calls.

  • chunksize (int) – Set the chunk size for arrow.

  • jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.

  • format_priority (List[str]) – The priority list of format.

get_pipeline_config()[source]

Return the config in DC, such as parallel, chunksize, jit and format_priority.

class towhee.functional.mixins.CompileMixin[source]

Bases: object

Mixin to just-in-time complie the Operator.

Examples:

>>> import numpy
>>> import towhee
>>> import time
>>> from towhee import register
>>> @register(name='inner_distance')
... def inner_distance(query, data):
...     dists = []
...     for vec in data:
...         dist = 0
...         for i in range(len(vec)):
...             dist += vec[i] * query[i]
...         dists.append(dist)
...     return dists
>>> data = [numpy.random.random((10000, 128)) for _ in range(10)]
>>> query = numpy.random.random(128)
>>> t1 = time.time()
>>> dc1 = (
...     towhee.dc['a'](data)
...     .runas_op['a', 'b'](func=lambda _: query)
...     .inner_distance[('b', 'a'), 'c']()
... )
>>> t2 = time.time()
>>> dc2 = (
...     towhee.dc['a'](data)
...     .config(jit='numba')
...     .runas_op['a', 'b'](func=lambda _: query)
...     .inner_distance[('b', 'a'), 'c']()
... )
>>> t3 = time.time()
>>> assert(t3-t2 < t2-t1)