DataCollection Mixins

class towhee.functional.mixins.DatasetMixin[source]

Bases: object

Mixin for dealing with dataset.

classmethod from_glob(*args)[source]

Generate a file list with pattern.

classmethod read_zip(url, pattern, mode='r')[source]

Load files from url/path.

Parameters:
  • zip_src (Union[str, path]) – The path leads to the image.

  • pattern (str) – The filename pattern to extract.

  • mode (str) – file open mode.

Returns:

The file handler for file in the zip file.

Return type:

(File)

to_csv(csv_path: Union[str, pathlib.Path], encoding: str = 'utf-8-sig')[source]

Save dc as a csv file.

Parameters:
  • csv_path (Union[str, Path]) – The path to save the dc to.

  • encoding (str) – The encoding to use in the output file.

split_train_test(size: list = [0.9, 0.1], **kws)[source]

Split DataCollection to train and test data.

Parameters:

size (list) – The size of the train and test.

Examples:

>>> from towhee.functional import DataCollection
>>> dc = DataCollection.range(10)
>>> train, test = dc.split_train_test(shuffle=False)
>>> train.to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> test.to_list()
[9]
class towhee.functional.mixins.DispatcherMixin[source]

Bases: object

Mixin for call dispatcher for data collection.

>>> import towhee
>>> from towhee import register
>>> @register(name='add_1')
... def add_1(x):
...     return x+1
>>> dc = towhee.range(5).stream()
>>> dc.add_1['a','b','c']() 
<map object at ...>
resolve(path, index, *arg, **kws)[source]

Dispatch unknown operators.

Parameters:
  • path (str) – The operator name.

  • index (str) – The index of data being called on.

Returns:

The operator that corresponds to the path.

Return type:

_OperatorLazyWrapper

class towhee.functional.mixins.ParallelMixin[source]

Bases: object

Mixin for parallel execution.

Examples:

>>> from towhee import DataCollection
>>> def add_1(x):
...     return x+1
>>> result = DataCollection.range(1000).set_parallel(2).map(add_1).to_list()
>>> len(result)
1000
>>> from towhee import dc
>>> dc = dc['a'](range(1000)).set_parallel(5)
>>> dc = dc.runas_op['a', 'b'](lambda x: x+1).to_list()
>>> len(dc)
1000
>>> from towhee import dc
>>> dc = dc['a'](range(1000)).set_parallel(5).set_chunksize(2)
>>> dc = dc.runas_op['a', 'b'](lambda x: x+1)
>>> dc._iterable.chunks()[:2]
[pyarrow.Table
a: int64
b: int64
----
a: [[0,1]]
b: [[1,2]], pyarrow.Table
a: int64
b: int64
----
a: [[2,3]]
b: [[3,4]]]
>>> result = DataCollection.range(1000).pmap(add_1, 10).pmap(add_1, 10).to_list()
>>> result[990:]
[992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001]
set_parallel(num_worker=2, backend='thread')[source]

Set parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = set()
>>> stage_2_thread_set = set()
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_2_thread_set)>1
True
split(count)[source]

Split a dataframe into multiple dataframes.

Parameters:

count (int) – how many resulting DCs;

Returns:

copies of DC;

Return type:

[DataCollection, ..]

Examples:

  1. Split:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> a, b, c = dc.split(3)
>>> a.zip(b, c).to_list()
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
pmap(unary_op, num_worker=None, backend=None)[source]

Apply unary_op with parallel execution. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream()
...     .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5)
...     .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list()
... )
>>> len(stage_1_thread_set) > 1
True
>>> len(stage_2_thread_set) > 1
True
mmap(ops: list, num_worker=None, backend=None)[source]

Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

# TODO: the test is broken with pytest # Examples:

# 1. Using mmap:

# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]

# 2. Using map instead of mmap:

# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]

# 3. DAG execution:

# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]

class towhee.functional.mixins.ComputerVisionMixin[source]

Bases: object

Mixin for computer vision problems.

image_imshow(title='image')[source]

Produce a CV2 imshow window.

Parameters:

title (str, optional) – The title for the image. Defaults to ‘image’.

classmethod read_camera(device_id=0, limit=- 1)[source]

Read images from a camera.

Parameters:
  • device_id (int, optional) – The camera device ID. Defaults to 0.

  • limit (int, optional) – The amount of images to capture. Defaults to -1.

Returns:

Collection with images.

Return type:

DataCollection

classmethod read_video(path, format='rgb24')[source]

Load a video as a DataCollection.

Parameters:
  • path (str) – The path of the video.

  • format (str, optional) – The color format of video. Defaults to ‘rgb24’.

Returns:

DataCollection with the video.

Return type:

DataCollection

to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)[source]

Encode a video; with audio if provided.

Parameters:
  • output_path (str) – Path to output the video to.

  • codec (str, optional) – Which codec to use for encoding. Defaults to None.

  • rate (int, optional) – The framrate of the video. Defaults to None.

  • width (int, optional) – The width of the video image. Defaults to None.

  • height (int, optional) – The height of the video image. Defaults to None.

  • format (str, optional) – The color format of the video. Defaults to None.

  • template (str, optional) – The template video stream of the ouput video stream. Defaults to None.

  • audio_src (str, optional) – Audio path to include in video. Defaults to None.

class towhee.functional.mixins.StateMixin[source]

Bases: object

Mixin for state tracking.

The most common use case is for tracking training and evaluation states.

Examples:

>>> import towhee
>>> from towhee import param_scope, State
>>> dc = towhee.range(10).set_state(State(a=1))
>>> dc.get_state()
{'a': 1}
>>> dc = dc.map(lambda x: x+1).map(lambda x: x*2)
>>> dc.get_state()
{'a': 1}
get_state()[source]

Get the state storage for DataCollection

Returns:

the state storage

Return type:

State

set_state(state)[source]

Set the state storage for DataCollection

Parameters:

state (State) – state storage

Returns:

data collection itself

Return type:

DataCollection

set_training(state=None)[source]

Set training mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

set_evaluating(state=None)[source]

Set evaluating mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

class towhee.functional.mixins.MetricMixin[source]

Bases: object

Mixin for metric

report()[source]

Report the metric scores, and if you are using ‘confusion matrix’ metric, please use jupyter to display the matrix.

Examples:

>>> from towhee import DataCollection
>>> from towhee import Entity
>>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])])
>>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report()
    accuracy  recall
lr       1.0     1.0
rf       0.8     0.8
{'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc1.with_metrics(['confusion_matrix']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() 
<IPython.core.display.HTML object>
{'lr': {'confusion_matrix': array([[3, 0],
       [0, 2]])}, 'rf': {'confusion_matrix': array([[2, 1],
       [0, 2]])}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])])
>>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report()
      mean_average_precision  mean_hit_ratio
test                0.622222             1.0
{'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
class towhee.functional.mixins.MilvusMixin[source]

Bases: object

Mixins for Milvus, such as loading data into Milvus collections. Note that the Milvus collection is created before loading the data. Refer to https://milvus.io/docs/v2.0.x/create_collection.md.

Parameters:
  • collection (Union[str, Collection]) – The collection name or pymilvus.Collection in Milvus.

  • batch (str) – The batch size to load into Milvus, defaults to 1.

  • stream (bool, optional) – Whther the stream mode with to_milvus, defaults to True.

Returns:

The DataCollection.

Examples:

Note

The shape of embedding vector refer to https://towhee.io/image-embedding/timm. And the dimension of the “test” collection should be the same as it.

>>> import towhee
>>> mr = ( 
...     towhee.glob['path']('./*.jpg')
...           .image_decode['path', 'img']()
...           .image_embedding.timm['img', 'vec'](model_name='resnet50')
...           .to_milvus['vec'](collection='test', batch=1000)
... )
class towhee.functional.mixins.FaissMixin[source]

Bases: object

Mixins for Faiss, such as loading data into Faiss. And ids and vectors need to be passed as index. If ids is a string, KV storage will be started, and the kv data will be saved to the specified directory as “kv.bin”.

Parameters:
  • findex (str or faiss.INDEX, optional) – The path to faiss index file(defaults to ‘./index.bin’) or faiss index.

  • string (str, optional) – A string to produce a composite Faiss index, which is the same parameter in faiss.index_factor, defaults to ‘IDMap,Flat’, and you can refer to https://github.com/facebookresearch/faiss/wiki/The-index-factory.

  • metric (faiss.METRIC, optional) – The metric for Faiss index, defaults to faiss.METRIC_L2.

Returns:

A DC, and will save the Faiss index file and kv file(if ids is string).

Examples:

Note

Please make sure the path to index_file is authorized, and it will write the Faiss index file and kv file(if ids is string).

>>> import towhee
>>> dc = ( 
...     towhee.glob['path']('./*.jpg')
...           .image_decode['path', 'img']()
...           .image_embedding.timm['img', 'vec'](model_name='resnet50')
...           .to_faiss['path', 'vec'](findex='./faiss/faiss.index')
... )
class towhee.functional.mixins.ConfigMixin[source]

Bases: object

Mixin to config DC, such as set the parallel, chunksize, jit and format_priority.

Examples

>>> import towhee
>>> dc = towhee.dc['a'](range(20))
>>> dc = dc.set_chunksize(10)
>>> dc = dc.set_parallel(2)
>>> dc = dc.set_jit('numba')
>>> dc.get_config()
{'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None}
>>> dc1 = towhee.dc([1,2,3]).config(jit='numba')
>>> dc2 = towhee.dc['a'](range(40)).config(parallel=2, chunksize=20)
>>> dc1.get_config()
{'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None}
>>> dc2.get_config()
{'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None}
>>> dc3 = towhee.dc['a'](range(10)).config(format_priority=['tensorrt', 'onnx'])
>>> dc3.get_config()
{'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
>>> import towhee
>>> dc = towhee.dc['a'](range(20))
>>> dc = dc.set_chunksize(10)
>>> dc = dc.set_parallel(2)
>>> dc = dc.set_jit('numba')
>>> dc.get_pipeline_config()
{'parallel': 2, 'chunksize': 10, 'jit': 'numba', 'format_priority': None}
>>> dc1 = towhee.dc([1,2,3]).pipeline_config(jit='numba')
>>> dc2 = towhee.dc['a'](range(40)).pipeline_config(parallel=2, chunksize=20)
>>> dc1.get_pipeline_config()
{'parallel': None, 'chunksize': None, 'jit': 'numba', 'format_priority': None}
>>> dc2.get_pipeline_config()
{'parallel': 2, 'chunksize': 20, 'jit': None, 'format_priority': None}
>>> dc3 = towhee.dc['a'](range(10)).pipeline_config(format_priority=['tensorrt', 'onnx'])
>>> dc3.get_pipeline_config()
{'parallel': None, 'chunksize': None, 'jit': None, 'format_priority': ['tensorrt', 'onnx']}
config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]

Set the parameters for the DC.

Parameters:
  • parallel (int, optional) – Set the number of parallel execution for the following calls, defaults to None.

  • chunksize (int, optional) – Set the chunk size for arrow, defaults to None.

  • jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.

  • format_priority (List[str], optional) – The priority list of formats, defaults to None.

Returns:

Self.

Return type:

DataCollection

get_config()[source]

Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.

Returns:

A dict of config parameters.

Return type:

dict

pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)[source]

Set the parameters in DC.

Parameters:
  • parallel (int, optional) – Set the number of parallel executions for the following calls, defaults to None.

  • chunksize (int, optional) – Set the chunk size for arrow, defaults to None.

  • jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.

  • format_priority (List[str], optional) – The priority list of format, defaults to None.

Returns:

Self

Return type:

DataCollection

get_pipeline_config()[source]

Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.

Returns:

A dict of config parameters.

Return type:

dict

class towhee.functional.mixins.CompileMixin[source]

Bases: object

Mixin to just-in-time compile the Operator. More information about just-in-time compilation can be found at https://en.wikipedia.org/wiki/Just-in-time_compilation.

Examples:

  1. compile the python function with numba:

>>> import numpy
>>> import towhee
>>> import time
>>> from towhee import register
>>> @register(name='inner_distance')
... def inner_distance(query, data):
...     dists = []
...     for vec in data:
...         dist = 0
...         for i in range(len(vec)):
...             dist += vec[i] * query[i]
...         dists.append(dist)
...     return dists
>>> data = [numpy.random.random((10000, 128)) for _ in range(10)]
>>> query = numpy.random.random(128)
>>> t1 = time.time()
>>> dc1 = (
...     towhee.dc['a'](data)
...     .runas_op['a', 'b'](func=lambda _: query)
...     .inner_distance[('b', 'a'), 'c']()
... )
>>> t2 = time.time()
>>> dc2 = (
...     towhee.dc['a'](data)
...     .config(jit='numba')
...     .runas_op['a', 'b'](func=lambda _: query)
...     .inner_distance[('b', 'a'), 'c']()
... )
>>> t3 = time.time()
>>> assert(t3-t2 < t2-t1)
  1. compile the models with towhee.compiler:

Note

It will take some time to compile in the first time.

More information about towhee.compiler refer to https://github.com/towhee-io/towhee-compiler.

>>> import towhee
>>> dc1 = (towhee.dc(['hello world']) 
...     .set_jit('towhee')
...     .text_embedding.transformers(model_name='distilbert-base-cased')
... )
class towhee.functional.mixins.DataProcessingMixin[source]

Bases: object

Mixin for processing data.

classmethod combine(*datacollections)[source]

Combine dataframes to be able to access schemas from seperate DF chains.

Parameters:

datacollections (DataFrame) – DataFrames to combine.

Examples

>>> import towhee
>>> a = towhee.range['a'](1,5)
>>> b = towhee.range['b'](5,10)
>>> c = towhee.range['c'](10, 15)
>>> z = towhee.DataFrame.combine(a, b, c)
>>> z.as_raw().to_list()
[(1, 5, 10), (2, 6, 11), (3, 7, 12), (4, 8, 13)]
select_from(other)[source]

Select data from dc with list(self).

Parameters:

other (DataCollection) – DataCollection to select from.

Examples

>>> from towhee import DataCollection
>>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2])
>>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1)
>>> list(dc3)
[[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
zip(*others)DataCollection[source]

Combine multiple data collections.

Parameters:

*others (DataCollection) – The other data collections.

Returns:

Data collection with zipped values.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc1 = DataCollection([1,2,3,4])
>>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1)
>>> dc3 = dc1.zip(dc2)
>>> list(dc3)
[(1, 2), (2, 3), (3, 4), (4, 5)]
head(n: int = 5)[source]

Return the first n values of a DataCollection.

Parameters:

n (int, optional) – The amount to select, defaults to 5.

Returns:

DataCollection with the selected values.

Return type:

DataCollection

sample(ratio=1.0)DataCollection[source]

Sample the data collection.

Parameters:

ratio (float) – sample ratio.

Returns:

Sampled data collection.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10000))
>>> result = dc.sample(0.1)
>>> ratio = len(result.to_list()) / 10000.
>>> 0.09 < ratio < 0.11
True
batch(size, drop_tail=False)[source]

Create batches from the DataCollection.

Parameters:
  • size (int) – Window size.

  • drop_tail (bool) – Drop trailing window that is not full, defaults to False.

Returns:

Batched DataCollection.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10))
>>> [list(batch) for batch in dc.batch(2)]
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3, drop_tail=True)
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity
>>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])])
>>> dc.batch(2)
[[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>], [<Entity dict_keys(['a', 'b'])>]]
rolling(size: int, step: int = 1, drop_head=True, drop_tail=True)[source]

Create rolling windows from DataCollection.

Parameters:
  • size (int) – Window size.

  • drop_head (bool) – Drop head windows that are not full.

  • drop_tail (bool) – Drop trailing windows that are not full.

Returns:

DataCollection of rolling windows.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_head=False)]
[[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_tail=False)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> dc.rolling(2, 2, drop_head=False, drop_tail=False)
[[0], [0, 1], [2, 3], [4]]
>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> dc.rolling(2, 4, drop_head=False, drop_tail=False)
[[0], [0, 1], [4]]
flatten(index=None)DataCollection[source]

Flatten nested data within DataCollection.

Returns:

Flattened DataCollection.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection, Entity
>>> dc = DataCollection(range(10))
>>> nested_dc = dc.batch(2)
>>> nested_dc.flatten().to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> g = (i for i in range(3))
>>> e = Entity(a=1, b=2, c=g)
>>> dc = DataCollection([e]).flatten('c')
>>> [str(i) for i in dc]
["{'a': 1, 'b': 2, 'c': 0}", "{'a': 1, 'b': 2, 'c': 1}", "{'a': 1, 'b': 2, 'c': 2}"]
shuffle()DataCollection[source]

Shuffle an unstreamed data collection in place.

Returns:

Shuffled data collection.

Return type:

DataCollection

Examples

1. Shuffle: >>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> a = dc.shuffle() >>> tuple(a) == tuple(range(5)) False

2. Streamed data collection is not supported: >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> _ = dc.shuffle() Traceback (most recent call last): TypeError: shuffle is not supported for streamed data collection.

group_by(index)DataCollection[source]

Merge columns in DataCollection. Unstreamed data only.

Examples: >>> import towhee >>> dc = towhee.dc[‘a’]([1,1,2,2,3,3]) >>> [i.a for i in dc] [1, 1, 2, 2, 3, 3] >>> dc = dc.group_by(‘a’) >>> [i.a for i in dc] [1, 2, 3]