DataCollection Mixins

class towhee.functional.mixins.DatasetMixin[source]

Bases: object

Mixin for dealing with dataset

classmethod from_glob(*args)[source]

generate a file list with pattern

classmethod from_zip(url, pattern, mode='r')[source]

load files from url/path.

Parameters
  • zip_src (Union[str, path]) – The path leads to the image.

  • pattern (str) – The filename pattern to extract.

  • mode (str) – file open mode.

Returns

The file handler for file in the zip file.

Return type

(File)

classmethod from_camera(device_id=0, limit=- 1)[source]

read images from a camera.

split_train_test(size: list = [0.9, 0.1], **kws)[source]

Split DataCollection to train and test data.

Parameters

size (list) – The size of the train and test.

Examples:

>>> from towhee.functional import DataCollection
>>> dc = DataCollection.range(10)
>>> train, test = dc.split_train_test(shuffle=False)
>>> train.to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> test.to_list()
[9]
class towhee.functional.mixins.DispatcherMixin[source]

Bases: object

Mixin for call dispatcher for data collection

>>> from towhee import register
>>> from towhee import ops
>>> from towhee import DataCollection
>>> @register(name='add_1')
... def add_1(x):
...     return x+1
>>> dc = DataCollection.range(5).stream()
>>> dc.add_1['a','b','c']() 
<map object at ...>
class towhee.functional.mixins.ParallelMixin[source]

Bases: object

Mixin for parallel execution.

Examples:

>>> from towhee import DataCollection
>>> def add_1(x):
...     return x+1
>>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list()
>>> len(result)
1000
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list()
>>> result[990:]
[992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
set_parallel(num_worker=2, backend='thread')[source]

Set parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = set()
>>> stage_2_thread_set = set()
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_2_thread_set)>1
True
unset_parallel()[source]

Unset parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .unset_parallel()
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_1_thread_set)>1
True
>>> len(stage_2_thread_set)>1
False
split(count)[source]

Split a dataframe into multiple dataframes.

Parameters

count (int) – how many resulting DCs;

Returns

copies of DC;

Return type

[DataCollection, …]

Examples:

  1. Split:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> a, b, c = dc.split(3)
>>> a.zip(b, c).to_list()
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
pmap(unary_op, num_worker=None, backend=None)[source]

Apply unary_op with parallel execution. Currently supports two backends, ray and thread.

Parameters
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream()
...     .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5)
...     .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list()
... )
>>> len(stage_1_thread_set) > 1
True
>>> len(stage_2_thread_set) > 1
True
mmap(ops: list, num_worker=None, backend=None)[source]

Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.

Parameters
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

# TODO: the test is broken with pytest # Examples:

# 1. Using mmap:

# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]

# 2. Using map instead of mmap:

# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]

# 3. DAG execution:

# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]

class towhee.functional.mixins.ComputerVisionMixin[source]

Bases: object

Mixin for computer vision problems.

classmethod read_video(path, format='rgb24')[source]

Load video as a datacollection.

Parameters
  • path – The path to the target video.

  • format – The format of the images loaded from video.

to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)[source]

Encode a video with audio if provided.

Parameters
  • output_path – The path of the output video.

  • codec – The codec to encode and decode the video.

  • rate – The rate of the video.

  • width – The width of the video.

  • height – The height of the video.

  • format – The format of the video frame image.

  • template – The template video stream of the ouput video stream.

  • audio_src – The audio to encode with the video.

class towhee.functional.mixins.StateMixin[source]

Bases: object

Mixin for state tracking.

Examples:

>>> from towhee import DataCollection, State
>>> from towhee import param_scope
>>> dc = DataCollection.range(10).set_state(State(a=1))
>>> dc.get_state()
{'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2)
>>> dc.get_state()
{'a': 1}
get_state()[source]

Get the state storage for DataCollection

Returns

the state storage

Return type

State

set_state(state)[source]

Set the state storage for DataCollection

Parameters

state (State) – state storage

Returns

data collection itself

Return type

DataCollection

set_training(state=None)[source]

Set training mode for stateful operators

Parameters

state (State, optional) – Update the state storage. Defaults to None.

Returns

data collection itself

Return type

DataCollection

set_evaluating(state=None)[source]

Set evaluating mode for stateful operators

Parameters

state (State, optional) – Update the state storage. Defaults to None.

Returns

data collection itself

Return type

DataCollection

class towhee.functional.mixins.MetricMixin[source]

Bases: object

Mixin for metric

report()[source]

report the metric scores

Examples:

>>> from towhee import DataCollection
>>> from towhee import Entity
>>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])])
>>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report()
    accuracy  recall
lr       1.0     1.0
rf       0.8     0.8
{'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])])
>>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report()
      mean_average_precision  mean_hit_ratio
test                0.622222             1.0
{'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
class towhee.functional.mixins.MilvusMixin[source]

Bases: object

Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.

Parameters
  • collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.

  • batch (str) – The batch size to load into Milvus, defaults to 1.

  • stream (bool, optional) – Whther the stream mode with to_milvus, defaults to True.

Returns

A MutationResult object contains insert_count represents how many and a primary_keys is a list of primary keys.

Examples:

Note

The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.

>>> import towhee
>>> from pymilvus import connections 
>>> mr = ( 
...     towhee.glob['path']('./*.jpg')
...           .image_decode['path', 'img']()
...           .image_embedding.timm['img', 'vec'](model_name='resnet50')
...           .to_milvus['vec'](collection='test', batch=1000)
... )