towhee.functional.mixins.DCMixins¶
- class towhee.functional.mixins.DCMixins[source]¶
Bases:
DatasetMixin
,DispatcherMixin
,DisplayMixin
,ParallelMixin
,ComputerVisionMixin
,StateMixin
,MetricMixin
,RayMixin
,ServeMixin
,MilvusMixin
,DagMixin
,FaissMixin
,ConfigMixin
,CompileMixin
,ListMixin
,DataProcessingMixin
,SafeMixin
,StreamMixin
,FormatPriorityMixin
,AudioMixin
,KVStorageMixin
Methods
api
Append item to data collection.
Make the DataFrame as callable function
as_str
Create batches from the DataCollection.
Clear a DataCollection.
Combine dataframes to be able to access schemas from seperate DF chains.
Compile the dag.
Set the parameters for the DC.
Copy a DataCollection.
Count an element in DataCollection.
Unbox Option values and drop Empty.
Making the data collection exception-safe by warp elements with Option.
Extend a DataCollection.
Unbox Option values and fill Empty with default values.
Flatten nested data within DataCollection.
from_df
Generate a file list with pattern.
get_backend
Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.
get_executor
get_formate_priority
get_num_worker
Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.
Get the state storage for DataCollection
Merge columns in DataCollection.
Return the first n values of a DataCollection.
Produce a CV2 imshow window.
Insert data into a DataCollection.
jit_resolve
Apply multiple unary_op to data collection.
Show dags' relations.
Notfify that a DataCollection was consumed.
Set the parameters in DC.
Apply unary_op with parallel execution.
Extend a DataCollection.
ray_resolve
Start the ray service.
read_audio
Read images from a camera.
read_csv
read_json
Load a video as a DataCollection.
Load files from url/path.
Function that can be called within the function trying to be added to dag.
Remove element from DataCollection.
Report the metric scores, and if you are using 'confusion matrix' metric, please use jupyter to display the matrix.
Dispatch unknown operators.
Reverse a DataCollection.
Create rolling windows from DataCollection.
Shortcut for exception_safe.
Sample the data collection.
Select data from dc with list(self).
Serve the DataFrame as a RESTful API
Set evaluating mode for stateful operators
Set format priority.
set_jit
Set parallel execution for following calls.
Set the state storage for DataCollection
Set training mode for stateful operators
Print the first n lines of a DataCollection.
Shuffle an unstreamed data collection in place.
smap
Sort a DataCollection.
Split a dataframe into multiple dataframes.
Split DataCollection to train and test data.
Create a stream data collection.
Save dc as a csv file.
Encode a video; with audio if provided.
Create a unstream data collection.
with_metrics
Combine multiple data collections.
Attributes
control_plane
Check whether the data collection is stream or unstream.
- append(*args) DataCollection ¶
Append item to data collection.
- Parameters:
item (Any) – the item to append
- Returns:
self
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2]) >>> dc.append(3).append(4) [0, 1, 2, 3, 4]
- as_function()¶
Make the DataFrame as callable function
- Returns:
a callable function
- Return type:
_type_
Examples:
>>> import towhee >>> with towhee.api() as api: ... func1 = ( ... api.map(lambda x: x+' -> 1') ... .map(lambda x: x+' => 1') ... .as_function() ... )
>>> with towhee.api['x']() as api: ... func2 = ( ... api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2') ... .select['y']() ... .as_raw() ... .as_function() ... )
>>> with towhee.api() as api: ... func3 = ( ... api.parse_json() ... .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3') ... .select['y']() ... .as_json() ... .as_function() ... )
>>> func1('1') '1 -> 1 => 1' >>> func2('2') '2 -> 2 => 2' >>> func3('{"x": "3"}') '{"y": "3 -> 3 => 3"}'
- batch(size, drop_tail=False)¶
Create batches from the DataCollection.
- Parameters:
size (int) – Window size.
drop_tail (bool) – Drop trailing window that is not full, defaults to False.
- Returns:
Batched DataCollection.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc = DataCollection(range(10)) >>> [list(batch) for batch in dc.batch(2)] [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3, drop_tail=True) [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity >>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])]) >>> dc.batch(2) [[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>], [<Entity dict_keys(['a', 'b'])>]]
- clear(*args) DataCollection ¶
Clear a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.clear() []
- classmethod combine(*datacollections)¶
Combine dataframes to be able to access schemas from seperate DF chains.
- Parameters:
datacollections (DataFrame) – DataFrames to combine.
Examples
>>> import towhee >>> a = towhee.range['a'](1,5) >>> b = towhee.range['b'](5,10) >>> c = towhee.range['c'](10, 15) >>> z = towhee.DataFrame.combine(a, b, c) >>> z.as_raw().to_list() [(1, 5, 10), (2, 6, 11), (3, 7, 12), (4, 8, 13)]
- compile_dag()¶
Compile the dag.
Runs a schema of commands that removes unecessary steps and cleans the DAG.
- Returns:
The compiled DAG.
- Return type:
dict
- config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)¶
Set the parameters for the DC.
- Parameters:
parallel (int, optional) – Set the number of parallel execution for the following calls, defaults to None.
chunksize (int, optional) – Set the chunk size for arrow, defaults to None.
jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.
format_priority (List[str], optional) – The priority list of formats, defaults to None.
- Returns:
Self.
- Return type:
- copy(*args) DataCollection ¶
Copy a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc_1 = dc.copy() >>> dc_1._iterable.append(4) >>> dc, dc_1 ([1, 2, 3], [1, 2, 3, 4])
- count(*args) int ¶
Count an element in DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.count(1) 1
- drop_empty(callback: Callable = None) DataCollection ¶
Unbox Option values and drop Empty.
- Parameters:
callback (Callable) – handler for empty values;
- Returns:
A DataCollection that drops empty values;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list() [0.0, 0.5, 1.0, 2.0]
Get inputs that case exceptions:
>>> exception_inputs = [] >>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value)) >>> exception_inputs [3]
- exception_safe()¶
Making the data collection exception-safe by warp elements with Option.
Examples:
Exception breaks pipeline execution:
>>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list() Traceback (most recent call last): ZeroDivisionError: division by zero
Exception-safe execution
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty()]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list() [Some(0.0), Some(0.5), Some(1.0)]
- extend(*args) DataCollection ¶
Extend a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.extend([4, 5]) [1, 2, 3, 4, 5]
- fill_empty(default: Any = None) DataCollection ¶
Unbox Option values and fill Empty with default values.
- Parameters:
default (Any) – default value to replace empty values;
- Returns:
data collection with empty values filled with default;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list() [0.0, 0.5, 1.0, -1.0, 2.0]
- flatten(*args) DataCollection ¶
Flatten nested data within DataCollection.
- Returns:
Flattened DataCollection.
- Return type:
Examples
>>> from towhee import DataCollection, Entity >>> dc = DataCollection(range(10)) >>> nested_dc = dc.batch(2) >>> nested_dc.flatten().to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> g = (i for i in range(3)) >>> e = Entity(a=1, b=2, c=g) >>> dc = DataCollection([e]).flatten('c') >>> [str(i) for i in dc] ["{'a': 1, 'b': 2, 'c': 0}", "{'a': 1, 'b': 2, 'c': 1}", "{'a': 1, 'b': 2, 'c': 2}"]
- classmethod from_glob(*args)¶
Generate a file list with pattern.
- get_config()¶
Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.
- Returns:
A dict of config parameters.
- Return type:
dict
- get_pipeline_config()¶
Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.
- Returns:
A dict of config parameters.
- Return type:
dict
- get_state()¶
Get the state storage for DataCollection
- Returns:
the state storage
- Return type:
State
- group_by(index) DataCollection ¶
Merge columns in DataCollection. Unstreamed data only.
Examples
>>> import towhee >>> dc = towhee.dc['a']([1,1,2,2,3,3]) >>> [i.a for i in dc] [1, 1, 2, 2, 3, 3]
>>> dc = dc.group_by('a') >>> [i.a for i in dc] [1, 2, 3]
- head(n: int = 5)¶
Return the first n values of a DataCollection.
- Parameters:
n (int, optional) – The amount to select, defaults to 5.
- Returns:
DataCollection with the selected values.
- Return type:
- image_imshow(title='image')¶
Produce a CV2 imshow window.
- Parameters:
title (str, optional) – The title for the image. Defaults to ‘image’.
- insert(*args) DataCollection ¶
Insert data into a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.insert(0, 0) [0, 1, 2, 3]
- property is_stream¶
Check whether the data collection is stream or unstream.
Examples:
>>> from towhee import DataCollection >>> from typing import Iterable >>> dc = DataCollection([0,1,2,3,4]) >>> dc.is_stream False
>>> result = dc.map(lambda x: x+1) >>> result.is_stream False >>> result._iterable [1, 2, 3, 4, 5]
>>> dc = DataCollection(iter(range(5))) >>> dc.is_stream True
>>> result = dc.map(lambda x: x+1) >>> result.is_stream True >>> isinstance(result._iterable, Iterable) True
- mmap(ops: list, num_worker=None, backend=None)¶
Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
# TODO: the test is broken with pytest # Examples:
# 1. Using mmap:
# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]
# 2. Using map instead of mmap:
# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]
# 3. DAG execution:
# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]
- netx()¶
Show dags’ relations.
- Returns:
The dags’ relations
- Return type:
image
- notify_consumed(new_id)¶
Notfify that a DataCollection was consumed.
When a DataCollection is consumed by a call to another DataCollection, that Dag needs to be aware of this, so any functions that consume more than the DataCollection calling the function need to use this function, e.g. zip().
- Parameters:
new_id (str) – The ID of the DataCollection that did the consuming.
- pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)¶
Set the parameters in DC.
- Parameters:
parallel (int, optional) – Set the number of parallel executions for the following calls, defaults to None.
chunksize (int, optional) – Set the chunk size for arrow, defaults to None.
jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.
format_priority (List[str], optional) – The priority list of format, defaults to None.
- Returns:
Self
- Return type:
- pmap(unary_op, num_worker=None, backend=None)¶
Apply unary_op with parallel execution. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = {threading.current_thread().ident} >>> stage_2_thread_set = {threading.current_thread().ident} >>> result = ( ... DataCollection.range(1000).stream() ... .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5) ... .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list() ... ) >>> len(stage_1_thread_set) > 1 True >>> len(stage_2_thread_set) > 1 True
- pop(*args) DataCollection ¶
Extend a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.pop() [1, 2]
- ray_start(address=None, local_packages: Optional[list] = None, pip_packages: Optional[list] = None, silence=True)¶
Start the ray service. When using a remote cluster, all dependencies for custom functions and operators defined locally will need to be sent to the ray cluster. If using ray locally, within the runtime, avoid passing in any arguments.
- Parameters:
address (str) – The address for the ray service being connected to. If using ray cluster remotely with kubectl forwarded port, the most likely address will be “ray://localhost:10001”.
local_packages (list[str]) – Whichever locally defined modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.
pip_packages (list[str]) – Whichever pip installed modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.
- classmethod read_camera(device_id=0, limit=-1)¶
Read images from a camera.
- Parameters:
device_id (int, optional) – The camera device ID. Defaults to 0.
limit (int, optional) – The amount of images to capture. Defaults to -1.
- Returns:
Collection with images.
- Return type:
- classmethod read_video(path, format='rgb24')¶
Load a video as a DataCollection.
- Parameters:
path (str) – The path of the video.
format (str, optional) – The color format of video. Defaults to ‘rgb24’.
- Returns:
DataCollection with the video.
- Return type:
- classmethod read_zip(url, pattern, mode='r')¶
Load files from url/path.
- Parameters:
zip_src (Union[str, path]) – The path leads to the image.
pattern (str) – The filename pattern to extract.
mode (str) – file open mode.
- Returns:
The file handler for file in the zip file.
- Return type:
(File)
- register_dag(children)¶
Function that can be called within the function trying to be added to dag.
- Parameters:
children (DataCollecton or list) – List of children DataCollection’s or singular child DataCollection.
- Returns:
The resulting child DataCollections.
- Return type:
DataCollection or list
- remove(*args) DataCollection ¶
Remove element from DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.remove(1) [2, 3]
- report()¶
Report the metric scores, and if you are using ‘confusion matrix’ metric, please use jupyter to display the matrix.
Examples:
>>> from towhee import DataCollection >>> from towhee import Entity >>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])]) >>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() accuracy recall lr 1.0 1.0 rf 0.8 0.8 {'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}} >>> dc1.with_metrics(['confusion_matrix']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() <IPython.core.display.HTML object> {'lr': {'confusion_matrix': array([[3, 0], [0, 2]])}, 'rf': {'confusion_matrix': array([[2, 1], [0, 2]])}} >>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])]) >>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report() mean_average_precision mean_hit_ratio test 0.622222 1.0 {'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
- resolve(path, index, *arg, **kws)¶
Dispatch unknown operators.
- Parameters:
path (str) – The operator name.
index (str) – The index of data being called on.
- Returns:
The operator that corresponds to the path.
- Return type:
_OperatorLazyWrapper
- reverse(*args) DataCollection ¶
Reverse a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.reverse() [3, 2, 1]
- rolling(size: int, step: int = 1, drop_head=True, drop_tail=True)¶
Create rolling windows from DataCollection.
- Parameters:
size (int) – Window size.
drop_head (bool) – Drop head windows that are not full.
drop_tail (bool) – Drop trailing windows that are not full.
- Returns:
DataCollection of rolling windows.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3)] [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_head=False)] [[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_tail=False)] [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> dc.rolling(2, 2, drop_head=False, drop_tail=False) [[0], [0, 1], [2, 3], [4]]
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> dc.rolling(2, 4, drop_head=False, drop_tail=False) [[0], [0, 1], [4]]
- safe()¶
Shortcut for exception_safe.
- sample(ratio=1.0) DataCollection ¶
Sample the data collection.
- Parameters:
ratio (float) – sample ratio.
- Returns:
Sampled data collection.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc = DataCollection(range(10000)) >>> result = dc.sample(0.1) >>> ratio = len(result.to_list()) / 10000. >>> 0.09 < ratio < 0.11 True
- select_from(other)¶
Select data from dc with list(self).
- Parameters:
other (DataCollection) – DataCollection to select from.
Examples
>>> from towhee import DataCollection >>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2]) >>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1) >>> list(dc3) [[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
- serve(path='/', app=None)¶
Serve the DataFrame as a RESTful API
- Parameters:
path (str, optional) – API path. Defaults to ‘/’.
app (_type_, optional) – The FastAPI app the API bind to, will create one if None.
- Returns:
the app that bind to
- Return type:
_type_
Examples:
>>> from fastapi import FastAPI >>> from fastapi.testclient import TestClient >>> app = FastAPI()
>>> import towhee >>> with towhee.api() as api: ... app1 = ( ... api.map(lambda x: x+' -> 1') ... .map(lambda x: x+' => 1') ... .serve('/app1', app) ... )
>>> with towhee.api['x']() as api: ... app2 = ( ... api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2') ... .select['y']() ... .serve('/app2', app) ... )
>>> with towhee.api() as api: ... app2 = ( ... api.parse_json() ... .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3') ... .select['y']() ... .serve('/app3', app) ... )
>>> client = TestClient(app) >>> client.post('/app1', '1').text '"1 -> 1 => 1"' >>> client.post('/app2', '2').text '{"y":"2 -> 2 => 2"}' >>> client.post('/app3', '{"x": "3"}').text '{"y":"3 -> 3 => 3"}'
- set_evaluating(state=None)¶
Set evaluating mode for stateful operators
- Parameters:
state (State, optional) – Update the state storage. Defaults to None.
- Returns:
data collection itself
- Return type:
- set_format_priority(format_priority: List[str])¶
Set format priority.
- Parameters:
format_priority (List[str]) – The priority queue of format.
- Returns:
DataCollection with format_priorty set.
- Return type:
- set_parallel(num_worker=2, backend='thread')¶
Set parallel execution for following calls.
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = set() >>> stage_2_thread_set = set() >>> result = ( ... DataCollection.range(1000).stream().set_parallel(4) ... .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident)) ... .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list() ... )
>>> len(stage_2_thread_set)>1 True
- set_state(state)¶
Set the state storage for DataCollection
- Parameters:
state (State) – state storage
- Returns:
data collection itself
- Return type:
- set_training(state=None)¶
Set training mode for stateful operators
- Parameters:
state (State, optional) – Update the state storage. Defaults to None.
- Returns:
data collection itself
- Return type:
- show(limit=5, header=None, tablefmt='html', formatter={})¶
Print the first n lines of a DataCollection.
- Parameters:
limit (int, optional) – The number of lines to print. Prints all if limit is negative. Defaults to 5.
header (_type_, optional) – The field names. Defaults to None.
tablefmt (str, optional) – The format of the output, supports html, plain, grid.. Defaults to ‘html’.
- shuffle() DataCollection ¶
Shuffle an unstreamed data collection in place.
- Returns:
Shuffled data collection.
- Return type:
Examples
1. Shuffle: >>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> a = dc.shuffle() >>> tuple(a) == tuple(range(5)) False
2. Streamed data collection is not supported: >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> _ = dc.shuffle() Traceback (most recent call last): TypeError: shuffle is not supported for streamed data collection.
- sort(*args) DataCollection ¶
Sort a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 4, 3]) >>> dc.sort() [1, 3, 4]
- split(count)¶
Split a dataframe into multiple dataframes.
- Parameters:
count (int) – how many resulting DCs;
- Returns:
copies of DC;
- Return type:
[DataCollection, …]
Examples:
Split:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> a, b, c = dc.split(3) >>> a.zip(b, c).to_list() [(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
- split_train_test(size: list = [0.9, 0.1], **kws)¶
Split DataCollection to train and test data.
- Parameters:
size (list) – The size of the train and test.
Examples:
>>> from towhee.functional import DataCollection >>> dc = DataCollection.range(10) >>> train, test = dc.split_train_test(shuffle=False) >>> train.to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8] >>> test.to_list() [9]
- stream()¶
Create a stream data collection.
Examples: 1. Convert a data collection to streamed version
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc.is_stream False
>>> dc = dc.stream() >>> dc.is_stream True
- to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')¶
Save dc as a csv file.
- Parameters:
csv_path (Union[str, Path]) – The path to save the dc to.
encoding (str) – The encoding to use in the output file.
- to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)¶
Encode a video; with audio if provided.
- Parameters:
output_path (str) – Path to output the video to.
codec (str, optional) – Which codec to use for encoding. Defaults to None.
rate (int, optional) – The framrate of the video. Defaults to None.
width (int, optional) – The width of the video image. Defaults to None.
height (int, optional) – The height of the video image. Defaults to None.
format (str, optional) – The color format of the video. Defaults to None.
template (str, optional) – The template video stream of the ouput video stream. Defaults to None.
audio_src (str, optional) – Audio path to include in video. Defaults to None.
- unstream()¶
Create a unstream data collection.
Examples:
Create a unstream data collection
>>> from towhee import DataCollection >>> dc = DataCollection(iter(range(5))).unstream() >>> dc.is_stream False
Convert a streamed data collection to unstream version
>>> dc = DataCollection(iter(range(5))) >>> dc.is_stream True >>> dc = dc.unstream() >>> dc.is_stream False
- zip(*others) DataCollection ¶
Combine multiple data collections.
- Parameters:
*others (DataCollection) – The other data collections.
- Returns:
Data collection with zipped values.
- Return type:
Examples
>>> from towhee import DataCollection >>> dc1 = DataCollection([1,2,3,4]) >>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1) >>> dc3 = dc1.zip(dc2) >>> list(dc3) [(1, 2), (2, 3), (3, 4), (4, 5)]