DataCollection Mixins¶
- class towhee.functional.mixins.DatasetMixin[source]¶
Bases:
object
Mixin for dealing with dataset
- classmethod read_zip(url, pattern, mode='r')[source]¶
load files from url/path.
- Parameters:
zip_src (Union[str, path]) – The path leads to the image.
pattern (str) – The filename pattern to extract.
mode (str) – file open mode.
- Returns:
The file handler for file in the zip file.
- Return type:
(File)
- to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')[source]¶
Save dc as a csv file.
- Parameters:
csv_path (Union[str, Path]) – The path to save the dc to.
encoding (str) – The encoding to use in the output file.
- split_train_test(size: list = [0.9, 0.1], **kws)[source]¶
Split DataCollection to train and test data.
- Parameters:
size (list) – The size of the train and test.
Examples:
>>> from towhee.functional import DataCollection >>> dc = DataCollection.range(10) >>> train, test = dc.split_train_test(shuffle=False) >>> train.to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8] >>> test.to_list() [9]
- class towhee.functional.mixins.DispatcherMixin[source]¶
Bases:
object
Mixin for call dispatcher for data collection
>>> from towhee import register >>> from towhee import ops >>> from towhee import DataCollection >>> @register(name='add_1') ... def add_1(x): ... return x+1
>>> dc = DataCollection.range(5).stream() >>> dc.add_1['a','b','c']() <map object at ...>
- class towhee.functional.mixins.ParallelMixin[source]¶
Bases:
object
Mixin for parallel execution.
Examples:
>>> from towhee import DataCollection >>> def add_1(x): ... return x+1 >>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list() >>> len(result) 1000
>>> from towhee import dc >>> dc = dc['a'](range(1000)).set_parallel(5) >>> dc = dc.runas_op['a', 'b'](lambda x: x+1).to_list() >>> len(dc) 1000
>>> from towhee import dc >>> dc = dc['a'](range(1000)).set_parallel(5).set_chunksize(2) >>> dc = dc.runas_op['a', 'b'](lambda x: x+1) >>> dc._iterable.chunks()[:2] [pyarrow.Table a: int64 b: int64 ---- a: [[0,1]] b: [[1,2]], pyarrow.Table a: int64 b: int64 ---- a: [[2,3]] b: [[3,4]]]
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list() >>> result[990:] [992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
- set_parallel(num_worker=2, backend='thread')[source]¶
Set parallel execution for following calls.
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = set() >>> stage_2_thread_set = set() >>> result = ( ... DataCollection.range(1000).stream().set_parallel(4) ... .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident)) ... .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list() ... )
>>> len(stage_2_thread_set)>1 True
- split(count)[source]¶
Split a dataframe into multiple dataframes.
- Parameters:
count (int) – how many resulting DCs;
- Returns:
copies of DC;
- Return type:
[DataCollection, …]
Examples:
Split:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> a, b, c = dc.split(3) >>> a.zip(b, c).to_list() [(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
- pmap(unary_op, num_worker=None, backend=None)[source]¶
Apply unary_op with parallel execution. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = {threading.current_thread().ident} >>> stage_2_thread_set = {threading.current_thread().ident} >>> result = ( ... DataCollection.range(1000).stream() ... .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5) ... .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list() ... ) >>> len(stage_1_thread_set) > 1 True >>> len(stage_2_thread_set) > 1 True
- mmap(ops: list, num_worker=None, backend=None)[source]¶
Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
# TODO: the test is broken with pytest # Examples:
# 1. Using mmap:
# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]
# 2. Using map instead of mmap:
# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]
# 3. DAG execution:
# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]
- class towhee.functional.mixins.ComputerVisionMixin[source]¶
Bases:
object
Mixin for computer vision problems.
- classmethod read_video(path, format='rgb24')[source]¶
Load video as a datacollection.
- Parameters:
path – The path to the target video.
format – The format of the images loaded from video.
- to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)[source]¶
Encode a video with audio if provided.
- Parameters:
output_path – The path of the output video.
codec – The codec to encode and decode the video.
rate – The rate of the video.
width – The width of the video.
height – The height of the video.
format – The format of the video frame image.
template – The template video stream of the ouput video stream.
audio_src – The audio to encode with the video.
- class towhee.functional.mixins.StateMixin[source]¶
Bases:
object
Mixin for state tracking.
Examples:
>>> from towhee import DataCollection, State >>> from towhee import param_scope >>> dc = DataCollection.range(10).set_state(State(a=1)) >>> dc.get_state() {'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2) >>> dc.get_state() {'a': 1}
- get_state()[source]¶
Get the state storage for DataCollection
- Returns:
the state storage
- Return type:
State
- set_state(state)[source]¶
Set the state storage for DataCollection
- Parameters:
state (State) – state storage
- Returns:
data collection itself
- Return type:
- set_training(state=None)[source]¶
Set training mode for stateful operators
- Parameters:
state (State, optional) – Update the state storage. Defaults to None.
- Returns:
data collection itself
- Return type:
- class towhee.functional.mixins.MetricMixin[source]¶
Bases:
object
Mixin for metric
- report()[source]¶
report the metric scores
Examples:
>>> from towhee import DataCollection >>> from towhee import Entity >>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])]) >>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() accuracy recall lr 1.0 1.0 rf 0.8 0.8 {'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])]) >>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report() mean_average_precision mean_hit_ratio test 0.622222 1.0 {'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
- class towhee.functional.mixins.MilvusMixin[source]¶
Bases:
object
Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.
- Parameters:
collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.
batch (str) – The batch size to load into Milvus, defaults to 1.
stream (bool, optional) – Whther the stream mode with to_milvus, defaults to True.
- Returns:
A MutationResult object contains insert_count represents how many and a primary_keys is a list of primary keys.
Examples:
Note
The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.
>>> import towhee >>> from pymilvus import connections >>> mr = ( ... towhee.glob['path']('./*.jpg') ... .image_decode['path', 'img']() ... .image_embedding.timm['img', 'vec'](model_name='resnet50') ... .to_milvus['vec'](collection='test', batch=1000) ... )
- class towhee.functional.mixins.FaissMixin[source]¶
Bases:
object
Mixins for Faiss, such as loading data into Faiss. And ids and vectors need to be passed as index. If ids is a string, KV storage will be started, and the kv data will be saved to the specified directory as “kv.bin”.
- Parameters:
findex (str or faiss.INDEX, optional) – The path to faiss index file(defaults to ‘./index.bin’) or faiss index.
string (str, optional) – A string to produce a composite Faiss index, which is the same parameter in faiss.index_factor, defaults to ‘IDMap,Flat’, and you can refer to https://github.com/facebookresearch/faiss/wiki/The-index-factory.
metric (faiss.METRIC, optional) – The metric for Faiss index, defaults to faiss.METRIC_L2.
- Returns:
A DC, and will save the Faiss index file and kv file(if ids is string).
Examples:
Note
Please make sure the path to index_file is authorized, and it will write the Faiss index file and kv file(if ids is string).
>>> import towhee >>> dc = ( ... towhee.glob['path']('./*.jpg') ... .image_decode['path', 'img']() ... .image_embedding.timm['img', 'vec'](model_name='resnet50') ... .to_faiss['path', 'vec'](findex='./faiss/faiss.index') ... )
- class towhee.functional.mixins.ConfigMixin[source]¶
Bases:
object
Mixin to config DC, such as set the parallel, chunksize, jit.
Examples:
>>> import towhee >>> dc = towhee.dc['a'](range(20)) >>> dc = dc.set_chunksize(10) >>> dc = dc.set_parallel(2) >>> dc = dc.set_jit('numba') >>> dc.get_config() {'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None} >>> dc1 = towhee.dc([1,2,3]).config(jit='numba') >>> dc2 = towhee.dc['a'](range(40)).config(parallel=2, chunksize=20) >>> dc1.get_config() {'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None} >>> dc2.get_config() {'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None} >>> dc3 = towhee.dc['a'](range(10)).config(format_priority=['tensorrt', 'onnx']) >>> dc3.get_config() {'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
>>> import towhee >>> dc = towhee.dc['a'](range(20)) >>> dc = dc.set_chunksize(10) >>> dc = dc.set_parallel(2) >>> dc = dc.set_jit('numba') >>> dc.get_pipeline_config() {'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None} >>> dc1 = towhee.dc([1,2,3]).pipeline_config(jit='numba') >>> dc2 = towhee.dc['a'](range(40)).pipeline_config(parallel=2, chunksize=20) >>> dc1.get_pipeline_config() {'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None} >>> dc2.get_pipeline_config() {'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None} >>> dc3 = towhee.dc['a'](range(10)).pipeline_config(format_priority=['tensorrt', 'onnx']) >>> dc3.get_pipeline_config() {'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
- config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]¶
Set the parameters in DC.
- Parameters:
parallel (int) – Set the number of parallel execution for following calls.
chunksize (int) – Set the chunk size for arrow.
jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.
format_priority (List[str]) – The priority list of format.
- get_config()[source]¶
Return the config in DC, such as parallel, chunksize, jit and format_priority.
- pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]¶
Set the parameters in DC.
- Parameters:
parallel (int) – Set the number of parallel execution for following calls.
chunksize (int) – Set the chunk size for arrow.
jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.
format_priority (List[str]) – The priority list of format.
- class towhee.functional.mixins.CompileMixin[source]¶
Bases:
object
Mixin to just-in-time complie the Operator.
Examples:
>>> import numpy >>> import towhee >>> import time >>> from towhee import register >>> @register(name='inner_distance') ... def inner_distance(query, data): ... dists = [] ... for vec in data: ... dist = 0 ... for i in range(len(vec)): ... dist += vec[i] * query[i] ... dists.append(dist) ... return dists >>> data = [numpy.random.random((10000, 128)) for _ in range(10)] >>> query = numpy.random.random(128)
>>> t1 = time.time() >>> dc1 = ( ... towhee.dc['a'](data) ... .runas_op['a', 'b'](func=lambda _: query) ... .inner_distance[('b', 'a'), 'c']() ... ) >>> t2 = time.time() >>> dc2 = ( ... towhee.dc['a'](data) ... .config(jit='numba') ... .runas_op['a', 'b'](func=lambda _: query) ... .inner_distance[('b', 'a'), 'c']() ... ) >>> t3 = time.time() >>> assert(t3-t2 < t2-t1)