DataCollection Mixins¶
- class towhee.functional.mixins.DatasetMixin[source]¶
Bases:
object
Mixin for dealing with dataset.
- classmethod read_zip(url, pattern, mode='r')[source]¶
Load files from url/path.
- Parameters:
zip_src (Union[str, path]) – The path leads to the image.
pattern (str) – The filename pattern to extract.
mode (str) – file open mode.
- Returns:
The file handler for file in the zip file.
- Return type:
(File)
- to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')[source]¶
Save dc as a csv file.
- Parameters:
csv_path (Union[str, Path]) – The path to save the dc to.
encoding (str) – The encoding to use in the output file.
- split_train_test(size: list = [0.9, 0.1], **kws)[source]¶
Split DataCollection to train and test data.
- Parameters:
size (list) – The size of the train and test.
Examples:
>>> from towhee.functional import DataCollection >>> dc = DataCollection.range(10) >>> train, test = dc.split_train_test(shuffle=False) >>> train.to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8] >>> test.to_list() [9]
- class towhee.functional.mixins.DispatcherMixin[source]¶
Bases:
object
Mixin for call dispatcher for data collection.
>>> import towhee >>> from towhee import register >>> @register(name='add_1') ... def add_1(x): ... return x+1
>>> dc = towhee.range(5).stream() >>> dc.add_1['a','b','c']() <map object at ...>
- class towhee.functional.mixins.ParallelMixin[source]¶
Bases:
object
Mixin for parallel execution.
Examples:
>>> from towhee import DataCollection >>> def add_1(x): ... return x+1 >>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list() >>> len(result) 1000
>>> from towhee import dc >>> dc = dc['a'](range(1000)).set_parallel(5) >>> dc = dc.runas_op['a', 'b'](lambda x: x+1).to_list() >>> len(dc) 1000
>>> from towhee import dc >>> dc = dc['a'](range(1000)).set_parallel(5).set_chunksize(2) >>> dc = dc.runas_op['a', 'b'](lambda x: x+1) >>> dc._iterable.chunks()[:2] [pyarrow.Table a: int64 b: int64 ---- a: [[0,1]] b: [[1,2]], pyarrow.Table a: int64 b: int64 ---- a: [[2,3]] b: [[3,4]]]
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list() >>> result[990:] [992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
- set_parallel(num_worker=2, backend='thread')[source]¶
Set parallel execution for following calls.
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = set() >>> stage_2_thread_set = set() >>> result = ( ... DataCollection.range(1000).stream().set_parallel(4) ... .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident)) ... .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list() ... )
>>> len(stage_2_thread_set)>1 True
- split(count)[source]¶
Split a dataframe into multiple dataframes.
- Parameters:
count (int) – how many resulting DCs;
- Returns:
copies of DC;
- Return type:
[DataCollection, …]
Examples:
Split:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> a, b, c = dc.split(3) >>> a.zip(b, c).to_list() [(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
- pmap(unary_op, num_worker=None, backend=None)[source]¶
Apply unary_op with parallel execution. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = {threading.current_thread().ident} >>> stage_2_thread_set = {threading.current_thread().ident} >>> result = ( ... DataCollection.range(1000).stream() ... .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5) ... .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list() ... ) >>> len(stage_1_thread_set) > 1 True >>> len(stage_2_thread_set) > 1 True
- mmap(ops: list, num_worker=None, backend=None)[source]¶
Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
# TODO: the test is broken with pytest # Examples:
# 1. Using mmap:
# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]
# 2. Using map instead of mmap:
# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]
# 3. DAG execution:
# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]
- class towhee.functional.mixins.ComputerVisionMixin[source]¶
Bases:
object
Mixin for computer vision problems.
- image_imshow(title='image')[source]¶
Produce a CV2 imshow window.
- Parameters:
title (str, optional) – The title for the image. Defaults to ‘image’.
- classmethod read_camera(device_id=0, limit=-1)[source]¶
Read images from a camera.
- Parameters:
device_id (int, optional) – The camera device ID. Defaults to 0.
limit (int, optional) – The amount of images to capture. Defaults to -1.
- Returns:
Collection with images.
- Return type:
- classmethod read_video(path, format='rgb24')[source]¶
Load a video as a DataCollection.
- Parameters:
path (str) – The path of the video.
format (str, optional) – The color format of video. Defaults to ‘rgb24’.
- Returns:
DataCollection with the video.
- Return type:
- to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)[source]¶
Encode a video; with audio if provided.
- Parameters:
output_path (str) – Path to output the video to.
codec (str, optional) – Which codec to use for encoding. Defaults to None.
rate (int, optional) – The framrate of the video. Defaults to None.
width (int, optional) – The width of the video image. Defaults to None.
height (int, optional) – The height of the video image. Defaults to None.
format (str, optional) – The color format of the video. Defaults to None.
template (str, optional) – The template video stream of the ouput video stream. Defaults to None.
audio_src (str, optional) – Audio path to include in video. Defaults to None.
- class towhee.functional.mixins.StateMixin[source]¶
Bases:
object
Mixin for state tracking.
The most common use case is for tracking training and evaluation states.
Examples:
>>> import towhee >>> from towhee import param_scope, State >>> dc = towhee.range(10).set_state(State(a=1)) >>> dc.get_state() {'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2) >>> dc.get_state() {'a': 1}
- get_state()[source]¶
Get the state storage for DataCollection
- Returns:
the state storage
- Return type:
State
- set_state(state)[source]¶
Set the state storage for DataCollection
- Parameters:
state (State) – state storage
- Returns:
data collection itself
- Return type:
- set_training(state=None)[source]¶
Set training mode for stateful operators
- Parameters:
state (State, optional) – Update the state storage. Defaults to None.
- Returns:
data collection itself
- Return type:
- class towhee.functional.mixins.MetricMixin[source]¶
Bases:
object
Mixin for metric
- report()[source]¶
Report the metric scores, and if you are using ‘confusion matrix’ metric, please use jupyter to display the matrix.
Examples:
>>> from towhee import DataCollection >>> from towhee import Entity >>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])]) >>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() accuracy recall lr 1.0 1.0 rf 0.8 0.8 {'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}} >>> dc1.with_metrics(['confusion_matrix']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() <IPython.core.display.HTML object> {'lr': {'confusion_matrix': array([[3, 0], [0, 2]])}, 'rf': {'confusion_matrix': array([[2, 1], [0, 2]])}} >>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])]) >>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report() mean_average_precision mean_hit_ratio test 0.622222 1.0 {'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
- class towhee.functional.mixins.MilvusMixin[source]¶
Bases:
object
Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.
- Parameters:
collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.
batch (str) – The batch size to load into Milvus, defaults to 1.
stream (bool, optional) – Whther the stream mode with to_milvus, defaults to True.
- Returns:
The DataCollection.
Examples:
Note
The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.
>>> import towhee >>> mr = ( ... towhee.glob['path']('./*.jpg') ... .image_decode['path', 'img']() ... .image_embedding.timm['img', 'vec'](model_name='resnet50') ... .to_milvus['vec'](collection='test', batch=1000) ... )
- class towhee.functional.mixins.FaissMixin[source]¶
Bases:
object
Mixins for Faiss, such as loading data into Faiss. And ids and vectors need to be passed as index. If ids is a string, KV storage will be started, and the kv data will be saved to the specified directory as “kv.bin”.
- Parameters:
findex (str or faiss.INDEX, optional) – The path to faiss index file(defaults to ‘./index.bin’) or faiss index.
string (str, optional) – A string to produce a composite Faiss index, which is the same parameter in faiss.index_factor, defaults to ‘IDMap,Flat’, and you can refer to https://github.com/facebookresearch/faiss/wiki/The-index-factory.
metric (faiss.METRIC, optional) – The metric for Faiss index, defaults to faiss.METRIC_L2.
- Returns:
A DC, and will save the Faiss index file and kv file(if ids is string).
Examples:
Note
Please make sure the path to index_file is authorized, and it will write the Faiss index file and kv file(if ids is string).
>>> import towhee >>> dc = ( ... towhee.glob['path']('./*.jpg') ... .image_decode['path', 'img']() ... .image_embedding.timm['img', 'vec'](model_name='resnet50') ... .to_faiss['path', 'vec'](findex='./faiss/faiss.index') ... )
- class towhee.functional.mixins.ConfigMixin[source]¶
Bases:
object
Mixin to config DC, such as set the parallel, chunksize, jit and format_priority.
Examples
>>> import towhee >>> dc = towhee.dc['a'](range(20)) >>> dc = dc.set_chunksize(10) >>> dc = dc.set_parallel(2) >>> dc = dc.set_jit('numba') >>> dc.get_config() {'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None} >>> dc1 = towhee.dc([1,2,3]).config(jit='numba') >>> dc2 = towhee.dc['a'](range(40)).config(parallel=2, chunksize=20) >>> dc1.get_config() {'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None} >>> dc2.get_config() {'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None} >>> dc3 = towhee.dc['a'](range(10)).config(format_priority=['tensorrt', 'onnx']) >>> dc3.get_config() {'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
>>> import towhee >>> dc = towhee.dc['a'](range(20)) >>> dc = dc.set_chunksize(10) >>> dc = dc.set_parallel(2) >>> dc = dc.set_jit('numba') >>> dc.get_pipeline_config() {'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None} >>> dc1 = towhee.dc([1,2,3]).pipeline_config(jit='numba') >>> dc2 = towhee.dc['a'](range(40)).pipeline_config(parallel=2, chunksize=20) >>> dc1.get_pipeline_config() {'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None} >>> dc2.get_pipeline_config() {'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None} >>> dc3 = towhee.dc['a'](range(10)).pipeline_config(format_priority=['tensorrt', 'onnx']) >>> dc3.get_pipeline_config() {'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
- config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]¶
Set the parameters for the DC.
- Parameters:
parallel (int, optional) – Set the number of parallel execution for the following calls, defaults to None.
chunksize (int, optional) – Set the chunk size for arrow, defaults to None.
jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.
format_priority (List[str], optional) – The priority list of formats, defaults to None.
- Returns:
Self.
- Return type:
- get_config()[source]¶
Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.
- Returns:
A dict of config parameters.
- Return type:
dict
- pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]¶
Set the parameters in DC.
- Parameters:
parallel (int, optional) – Set the number of parallel executions for the following calls, defaults to None.
chunksize (int, optional) – Set the chunk size for arrow, defaults to None.
jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.
format_priority (List[str], optional) – The priority list of format, defaults to None.
- Returns:
Self
- Return type:
- class towhee.functional.mixins.CompileMixin[source]¶
Bases:
object
Mixin to just-in-time compile the Operator. More information about just-in-time compilation can be found at https://en.wikipedia.org/wiki/Just-in-time_compilation.
Examples:
compile the python function with numba:
>>> import numpy >>> import towhee >>> import time >>> from towhee import register >>> @register(name='inner_distance') ... def inner_distance(query, data): ... dists = [] ... for vec in data: ... dist = 0 ... for i in range(len(vec)): ... dist += vec[i] * query[i] ... dists.append(dist) ... return dists >>> data = [numpy.random.random((10000, 128)) for _ in range(10)] >>> query = numpy.random.random(128)
>>> t1 = time.time() >>> dc1 = ( ... towhee.dc['a'](data) ... .runas_op['a', 'b'](func=lambda _: query) ... .inner_distance[('b', 'a'), 'c']() ... ) >>> t2 = time.time() >>> dc2 = ( ... towhee.dc['a'](data) ... .config(jit='numba') ... .runas_op['a', 'b'](func=lambda _: query) ... .inner_distance[('b', 'a'), 'c']() ... ) >>> t3 = time.time() >>> assert(t3-t2 < t2-t1)
compile the models with towhee.compiler:
Note
It will take some time to compile in the first time.
More information about towhee.compiler refer to https://github.com/towhee-io/towhee-compiler.
>>> import towhee >>> dc1 = (towhee.dc(['hello world']) ... .set_jit('towhee') ... .text_embedding.transformers(model_name='distilbert-base-cased') ... )
- class towhee.functional.mixins.DataProcessingMixin[source]¶
Bases:
object
Mixin for processing data.
- classmethod combine(*datacollections)[source]¶
Combine dataframes to be able to access schemas from seperate DF chains.
- Parameters:
datacollections (DataFrame) – DataFrames to combine.
Examples
>>> import towhee >>> a = towhee.range['a'](1,5) >>> b = towhee.range['b'](5,10) >>> c = towhee.range['c'](10, 15) >>> z = towhee.DataFrame.combine(a, b, c) >>> z.as_raw().to_list() [(1, 5, 10), (2, 6, 11), (3, 7, 12), (4, 8, 13)]
- select_from(other)[source]¶
Select data from dc with list(self).
- Parameters:
other (DataCollection) – DataCollection to select from.
Examples
>>> from towhee import DataCollection >>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2]) >>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1) >>> list(dc3) [[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
- zip(*others) DataCollection [source]¶
Combine multiple data collections.
- Parameters:
*others (DataCollection) – The other data collections.
- Returns:
Data collection with zipped values.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc1 = DataCollection([1,2,3,4]) >>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1) >>> dc3 = dc1.zip(dc2) >>> list(dc3) [(1, 2), (2, 3), (3, 4), (4, 5)]
- head(n: int = 5)[source]¶
Return the first n values of a DataCollection.
- Parameters:
n (int, optional) – The amount to select, defaults to 5.
- Returns:
DataCollection with the selected values.
- Return type:
- sample(ratio=1.0) DataCollection [source]¶
Sample the data collection.
- Parameters:
ratio (float) – sample ratio.
- Returns:
Sampled data collection.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc = DataCollection(range(10000)) >>> result = dc.sample(0.1) >>> ratio = len(result.to_list()) / 10000. >>> 0.09 < ratio < 0.11 True
- batch(size, drop_tail=False)[source]¶
Create batches from the DataCollection.
- Parameters:
size (int) – Window size.
drop_tail (bool) – Drop trailing window that is not full, defaults to False.
- Returns:
Batched DataCollection.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc = DataCollection(range(10)) >>> [list(batch) for batch in dc.batch(2)] [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3, drop_tail=True) [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity >>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])]) >>> dc.batch(2) [[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>], [<Entity dict_keys(['a', 'b'])>]]
- rolling(size: int, step: int = 1, drop_head=True, drop_tail=True)[source]¶
Create rolling windows from DataCollection.
- Parameters:
size (int) – Window size.
drop_head (bool) – Drop head windows that are not full.
drop_tail (bool) – Drop trailing windows that are not full.
- Returns:
DataCollection of rolling windows.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3)] [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_head=False)] [[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_tail=False)] [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> dc.rolling(2, 2, drop_head=False, drop_tail=False) [[0], [0, 1], [2, 3], [4]]
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> dc.rolling(2, 4, drop_head=False, drop_tail=False) [[0], [0, 1], [4]]
- flatten(*args) DataCollection [source]¶
Flatten nested data within DataCollection.
- Returns:
Flattened DataCollection.
- Return type:
Examples
>>> from towhee import DataCollection, Entity >>> dc = DataCollection(range(10)) >>> nested_dc = dc.batch(2) >>> nested_dc.flatten().to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> g = (i for i in range(3)) >>> e = Entity(a=1, b=2, c=g) >>> dc = DataCollection([e]).flatten('c') >>> [str(i) for i in dc] ["{'a': 1, 'b': 2, 'c': 0}", "{'a': 1, 'b': 2, 'c': 1}", "{'a': 1, 'b': 2, 'c': 2}"]
- shuffle() DataCollection [source]¶
Shuffle an unstreamed data collection in place.
- Returns:
Shuffled data collection.
- Return type:
Examples
1. Shuffle: >>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> a = dc.shuffle() >>> tuple(a) == tuple(range(5)) False
2. Streamed data collection is not supported: >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> _ = dc.shuffle() Traceback (most recent call last): TypeError: shuffle is not supported for streamed data collection.
- group_by(index) DataCollection [source]¶
Merge columns in DataCollection. Unstreamed data only.
Examples
>>> import towhee >>> dc = towhee.dc['a']([1,1,2,2,3,3]) >>> [i.a for i in dc] [1, 1, 2, 2, 3, 3]
>>> dc = dc.group_by('a') >>> [i.a for i in dc] [1, 2, 3]