DataCollection Mixins

class towhee.functional.mixins.DatasetMixin[source]

Bases: object

Mixin for dealing with dataset

classmethod from_glob(*args)[source]

generate a file list with pattern

classmethod from_zip(url, pattern, mode='r')[source]

load files from url/path.

Parameters
  • zip_src (Union[str, path]) – The path leads to the image.

  • pattern (str) – The filename pattern to extract.

  • mode (str) – file open mode.

Returns

The file handler for file in the zip file.

Return type

(File)

classmethod from_camera(device_id=0, limit=- 1)[source]

read images from a camera.

split_train_test(size: list = [0.9, 0.1], **kws)[source]

Split DataCollection to train and test data.

Parameters

size (list) – The size of the train and test.

Examples:

>>> from towhee.functional import DataCollection
>>> dc = DataCollection.range(10)
>>> train, test = dc.split_train_test(shuffle=False)
>>> train.to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> test.to_list()
[9]
class towhee.functional.mixins.DispatcherMixin[source]

Bases: object

Mixin for call dispatcher for data collection

>>> from towhee import register
>>> from towhee import ops
>>> from towhee import DataCollection
>>> @register(name='add_1')
... def add_1(x):
...     return x+1
>>> dc = DataCollection.range(5).stream()
>>> dc.add_1['a','b','c']() 
<map object at ...>
class towhee.functional.mixins.ParallelMixin[source]

Bases: object

Mixin for parallel execution.

Examples:

>>> from towhee import DataCollection
>>> def add_1(x):
...     return x+1
>>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list()
>>> len(result)
1000
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list()
>>> result[990:]
[992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
set_parallel(num_worker=2, backend='thread')[source]

Set parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = set()
>>> stage_2_thread_set = set()
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_2_thread_set)>1
True
unset_parallel()[source]

Unset parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .unset_parallel()
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_1_thread_set)>1
True
>>> len(stage_2_thread_set)>1
False
split(count)[source]

Split a dataframe into multiple dataframes.

Parameters

count (int) – how many resulting DCs;

Returns

copies of DC;

Return type

[DataCollection, …]

Examples:

  1. Split:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> a, b, c = dc.split(3)
>>> a.zip(b, c).to_list()
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
pmap(unary_op, num_worker=None, backend=None)[source]

Apply unary_op with parallel execution. Currently supports two backends, ray and thread.

Parameters
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream()
...     .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5)
...     .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list()
... )
>>> len(stage_1_thread_set) > 1
True
>>> len(stage_2_thread_set) > 1
True
mmap(ops: list, num_worker=None, backend=None)[source]

Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.

Parameters
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

# TODO: the test is broken with pytest # Examples:

# 1. Using mmap:

# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]

# 2. Using map instead of mmap:

# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]

# 3. DAG execution:

# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]

class towhee.functional.mixins.ComputerVisionMixin[source]

Bases: object

Mixin for computer vision problems.

class towhee.functional.mixins.EntityMixin[source]

Bases: object

Mixin to help deal with Entity.

Examples:

  1. define an operator with register decorator

>>> from towhee import register
>>> from towhee import DataCollection
>>> @register
... def add_1(x):
...     return x+1
  1. apply the operator to named field of entity and save result to another named field

>>> (
...     DataCollection([dict(a=1, b=2), dict(a=2, b=3)])
...         .as_entity()
...         .add_1['a', 'c']() # <-- use field `a` as input and filed `c` as output
...         .as_str()
...         .to_list()
... )
['{"a": 1, "b": 2, "c": 2}', '{"a": 2, "b": 3, "c": 3}']

Select the entity on the specified fields.

Examples:

  1. Select the entity on one specified field:

>>> from towhee import Entity
>>> from towhee import DataCollection
>>> dc = DataCollection([Entity(a=i, b=i, c=i) for i in range(2)])
>>> dc.select['a']().to_list()
[<Entity dict_keys(['a'])>, <Entity dict_keys(['a'])>]
  1. Select multiple fields and unpack the entity:

>>> (
...     DataCollection([Entity(a=i, b=i, c=i) for i in range(5)])
...         .select['a', 'b']()
...         .as_raw()
...         .to_list()
... )
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
  1. Another field selection syntax (not suggested):

>>> (
...     DataCollection([Entity(a=i, b=i, c=i) for i in range(5)])
...         .select('a', 'b')
...         .as_raw()
...         .to_list()
... )
[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
fill_entity(_DefaultKVs: Optional[Dict[str, Any]] = None, _ReplaceNoneValue: bool = False, **kws)[source]

When DataCollection’s iterable exists of Entities and some indexes missing, fill default value for those indexes.

Parameters
  • _ReplaceNoneValue (bool) – Whether to replace None in Entity’s value.

  • _DefaultKVs (Dict[str, Any]) – The key-value pairs stored in a dict.

as_entity(schema: Optional[List[str]] = None)[source]

Convert elements into Entities.

Parameters

schema (Optional[List[str]]) – schema contains field names.

Examples: 1. convert dicts into entities:

>>> from towhee import DataCollection
>>> (
...     DataCollection([dict(a=1, b=2), dict(a=2, b=3)])
...         .as_entity()
...         .as_str()
...         .to_list()
... )
['{"a": 1, "b": 2}', '{"a": 2, "b": 3}']
  1. convert tuples into entities:

>>> from towhee import DataCollection
>>> (
...     DataCollection([(1, 2), (2, 3)])
...         .as_entity(schema=['a', 'b'])
...         .as_str()
...         .to_list()
... )
['{"a": 1, "b": 2}', '{"a": 2, "b": 3}']
  1. convert single value into entities:

>>> from towhee import DataCollection
>>> (
...     DataCollection([1, 2])
...         .as_entity(schema=['a'])
...         .as_str()
...         .to_list()
... )
['{"a": 1}', '{"a": 2}']
parse_json()[source]

Parse string to entities.

Examples:

>>> from towhee import DataCollection
>>> dc = (
...     DataCollection(['{"x": 1}'])
...         .parse_json()
... )
>>> dc[0].x
1
as_json()[source]

Convert entities to json

Examples:

>>> from towhee import DataCollection, Entity
>>> (
...     DataCollection([Entity(x=1)])
...         .as_json()
... )
['{"x": 1}']
as_raw()[source]

Convert entitis into raw python values

Examples:

  1. unpack multiple values from entities:

>>> from towhee import DataCollection
>>> (
...     DataCollection([(1, 2), (2, 3)])
...         .as_entity(schema=['a', 'b'])
...         .as_raw()
...         .to_list()
... )
[(1, 2), (2, 3)]
  1. unpack single value from entities:

>>> (
...     DataCollection([1, 2])
...         .as_entity(schema=['a'])
...         .as_raw()
...         .to_list()
... )
[1, 2]
replace(**kws)[source]

Replace specific attributes with given vlues.

dropna(na: Set[str] = {'', None}) Union[bool, DataCollection][source]

Drop entities that contain some specific values.

Parameters

na (Set[str]) – Those entities contain values in na will be dropped.

rename(column: Dict[str, str])[source]

Rename an column in DataCollection.

Parameters

column (Dict[str, str]) – The columns to rename and their corresponding new name.

class towhee.functional.mixins.StateMixin[source]

Bases: object

Mixin for state tracking.

Examples:

>>> from towhee import DataCollection, State
>>> from towhee import param_scope
>>> dc = DataCollection.range(10).set_state(State(a=1))
>>> dc.get_state()
{'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2)
>>> dc.get_state()
{'a': 1}
get_state()[source]

Get the state storage for DataCollection

Returns

the state storage

Return type

State

set_state(state)[source]

Set the state storage for DataCollection

Parameters

state (State) – state storage

Returns

data collection itself

Return type

DataCollection

set_training(state=None)[source]

Set training mode for stateful operators

Parameters

state (State, optional) – Update the state storage. Defaults to None.

Returns

data collection itself

Return type

DataCollection

set_evaluating(state=None)[source]

Set evaluating mode for stateful operators

Parameters

state (State, optional) – Update the state storage. Defaults to None.

Returns

data collection itself

Return type

DataCollection

class towhee.functional.mixins.MetricMixin[source]

Bases: object

Mixin for metric

report()[source]

report the metric scores

Examples:

>>> from towhee import DataCollection
>>> from towhee import Entity
>>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])])
>>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report()
    accuracy  recall
lr       1.0     1.0
rf       0.8     0.8
{'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])])
>>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report()
      mean_average_precision  mean_hit_ratio
test                0.622222             1.0
{'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
class towhee.functional.mixins.MilvusMixin[source]

Bases: object

Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.

Parameters
  • collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.

  • batch (str) – The batch size to load into Milvus, defaults to 1.

Returns

A MutationResult object contains insert_count represents how many and a primary_keys is a list of primary keys.

Examples:

Note

The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.

>>> import towhee
>>> from pymilvus import connections 
>>> mr = ( 
...     towhee.glob['path']('./*.jpg')
...           .image_decode['path', 'img']()
...           .image_embedding.timm['img', 'vec'](model_name='resnet50')
...           .to_milvus['vec'](collection='test', batch=1000)
... )