DataCollection Mixins
- class towhee.functional.mixins.DatasetMixin[source]
Bases:
object
Mixin for dealing with dataset
- classmethod from_zip(url, pattern, mode='r')[source]
load files from url/path.
- Parameters
zip_src (Union[str, path]) – The path leads to the image.
pattern (str) – The filename pattern to extract.
mode (str) – file open mode.
- Returns
The file handler for file in the zip file.
- Return type
(File)
- split_train_test(size: list = [0.9, 0.1], **kws)[source]
Split DataCollection to train and test data.
- Parameters
size (list) – The size of the train and test.
Examples:
>>> from towhee.functional import DataCollection >>> dc = DataCollection.range(10) >>> train, test = dc.split_train_test(shuffle=False) >>> train.to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8] >>> test.to_list() [9]
- class towhee.functional.mixins.DispatcherMixin[source]
Bases:
object
Mixin for call dispatcher for data collection
>>> from towhee import register >>> from towhee import ops >>> from towhee import DataCollection >>> @register(name='add_1') ... def add_1(x): ... return x+1
>>> dc = DataCollection.range(5).stream() >>> dc.add_1['a','b','c']() <map object at ...>
- class towhee.functional.mixins.ParallelMixin[source]
Bases:
object
Mixin for parallel execution.
Examples:
>>> from towhee import DataCollection >>> def add_1(x): ... return x+1 >>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list() >>> len(result) 1000
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list() >>> result[990:] [992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
- set_parallel(num_worker=2, backend='thread')[source]
Set parallel execution for following calls.
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = set() >>> stage_2_thread_set = set() >>> result = ( ... DataCollection.range(1000).stream().set_parallel(4) ... .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident)) ... .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list() ... )
>>> len(stage_2_thread_set)>1 True
- unset_parallel()[source]
Unset parallel execution for following calls.
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = {threading.current_thread().ident} >>> stage_2_thread_set = {threading.current_thread().ident} >>> result = ( ... DataCollection.range(1000).stream().set_parallel(4) ... .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident)) ... .unset_parallel() ... .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list() ... )
>>> len(stage_1_thread_set)>1 True >>> len(stage_2_thread_set)>1 False
- split(count)[source]
Split a dataframe into multiple dataframes.
- Parameters
count (int) – how many resulting DCs;
- Returns
copies of DC;
- Return type
[DataCollection, …]
Examples:
Split:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> a, b, c = dc.split(3) >>> a.zip(b, c).to_list() [(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
- pmap(unary_op, num_worker=None, backend=None)[source]
Apply unary_op with parallel execution. Currently supports two backends, ray and thread.
- Parameters
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = {threading.current_thread().ident} >>> stage_2_thread_set = {threading.current_thread().ident} >>> result = ( ... DataCollection.range(1000).stream() ... .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5) ... .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list() ... ) >>> len(stage_1_thread_set) > 1 True >>> len(stage_2_thread_set) > 1 True
- mmap(ops: list, num_worker=None, backend=None)[source]
Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.
- Parameters
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
# TODO: the test is broken with pytest # Examples:
# 1. Using mmap:
# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]
# 2. Using map instead of mmap:
# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]
# 3. DAG execution:
# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]
- class towhee.functional.mixins.ComputerVisionMixin[source]
Bases:
object
Mixin for computer vision problems.
- class towhee.functional.mixins.EntityMixin[source]
Bases:
object
Mixin to help deal with Entity.
Examples:
define an operator with register decorator
>>> from towhee import register >>> from towhee import DataCollection >>> @register ... def add_1(x): ... return x+1
apply the operator to named field of entity and save result to another named field
>>> ( ... DataCollection([dict(a=1, b=2), dict(a=2, b=3)]) ... .as_entity() ... .add_1['a', 'c']() # <-- use field `a` as input and filed `c` as output ... .as_str() ... .to_list() ... ) ['{"a": 1, "b": 2, "c": 2}', '{"a": 2, "b": 3, "c": 3}']
Select the entity on the specified fields.
Examples:
Select the entity on one specified field:
>>> from towhee import Entity >>> from towhee import DataCollection >>> dc = DataCollection([Entity(a=i, b=i, c=i) for i in range(2)]) >>> dc.select['a']().to_list() [<Entity dict_keys(['a'])>, <Entity dict_keys(['a'])>]
Select multiple fields and unpack the entity:
>>> ( ... DataCollection([Entity(a=i, b=i, c=i) for i in range(5)]) ... .select['a', 'b']() ... .as_raw() ... .to_list() ... ) [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
Another field selection syntax (not suggested):
>>> ( ... DataCollection([Entity(a=i, b=i, c=i) for i in range(5)]) ... .select('a', 'b') ... .as_raw() ... .to_list() ... ) [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
- fill_entity(_DefaultKVs: Optional[Dict[str, Any]] = None, _ReplaceNoneValue: bool = False, **kws)[source]
When DataCollection’s iterable exists of Entities and some indexes missing, fill default value for those indexes.
- Parameters
_ReplaceNoneValue (bool) – Whether to replace None in Entity’s value.
_DefaultKVs (Dict[str, Any]) – The key-value pairs stored in a dict.
- as_entity(schema: Optional[List[str]] = None)[source]
Convert elements into Entities.
- Parameters
schema (Optional[List[str]]) – schema contains field names.
Examples: 1. convert dicts into entities:
>>> from towhee import DataCollection >>> ( ... DataCollection([dict(a=1, b=2), dict(a=2, b=3)]) ... .as_entity() ... .as_str() ... .to_list() ... ) ['{"a": 1, "b": 2}', '{"a": 2, "b": 3}']
convert tuples into entities:
>>> from towhee import DataCollection >>> ( ... DataCollection([(1, 2), (2, 3)]) ... .as_entity(schema=['a', 'b']) ... .as_str() ... .to_list() ... ) ['{"a": 1, "b": 2}', '{"a": 2, "b": 3}']
convert single value into entities:
>>> from towhee import DataCollection >>> ( ... DataCollection([1, 2]) ... .as_entity(schema=['a']) ... .as_str() ... .to_list() ... ) ['{"a": 1}', '{"a": 2}']
- parse_json()[source]
Parse string to entities.
Examples:
>>> from towhee import DataCollection >>> dc = ( ... DataCollection(['{"x": 1}']) ... .parse_json() ... ) >>> dc[0].x 1
- as_json()[source]
Convert entities to json
Examples:
>>> from towhee import DataCollection, Entity >>> ( ... DataCollection([Entity(x=1)]) ... .as_json() ... ) ['{"x": 1}']
- as_raw()[source]
Convert entitis into raw python values
Examples:
unpack multiple values from entities:
>>> from towhee import DataCollection >>> ( ... DataCollection([(1, 2), (2, 3)]) ... .as_entity(schema=['a', 'b']) ... .as_raw() ... .to_list() ... ) [(1, 2), (2, 3)]
unpack single value from entities:
>>> ( ... DataCollection([1, 2]) ... .as_entity(schema=['a']) ... .as_raw() ... .to_list() ... ) [1, 2]
- dropna(na: Set[str] = {'', None}) Union[bool, DataCollection] [source]
Drop entities that contain some specific values.
- Parameters
na (Set[str]) – Those entities contain values in na will be dropped.
- class towhee.functional.mixins.StateMixin[source]
Bases:
object
Mixin for state tracking.
Examples:
>>> from towhee import DataCollection, State >>> from towhee import param_scope >>> dc = DataCollection.range(10).set_state(State(a=1)) >>> dc.get_state() {'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2) >>> dc.get_state() {'a': 1}
- get_state()[source]
Get the state storage for DataCollection
- Returns
the state storage
- Return type
State
- set_state(state)[source]
Set the state storage for DataCollection
- Parameters
state (State) – state storage
- Returns
data collection itself
- Return type
- set_training(state=None)[source]
Set training mode for stateful operators
- Parameters
state (State, optional) – Update the state storage. Defaults to None.
- Returns
data collection itself
- Return type
- class towhee.functional.mixins.MetricMixin[source]
Bases:
object
Mixin for metric
- report()[source]
report the metric scores
Examples:
>>> from towhee import DataCollection >>> from towhee import Entity >>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])]) >>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() accuracy recall lr 1.0 1.0 rf 0.8 0.8 {'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])]) >>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report() mean_average_precision mean_hit_ratio test 0.622222 1.0 {'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
- class towhee.functional.mixins.MilvusMixin[source]
Bases:
object
Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.
- Parameters
collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.
batch (str) – The batch size to load into Milvus, defaults to 1.
- Returns
A MutationResult object contains insert_count represents how many and a primary_keys is a list of primary keys.
Examples:
Note
The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.
>>> import towhee >>> from pymilvus import connections >>> mr = ( ... towhee.glob['path']('./*.jpg') ... .image_decode['path', 'img']() ... .image_embedding.timm['img', 'vec'](model_name='resnet50') ... .to_milvus['vec'](collection='test', batch=1000) ... )