towhee.functional.data_collection.DataFrame¶
- class towhee.functional.data_collection.DataFrame(iterable: Optional[Iterable] = None, **kws)[source]¶
Bases:
DataCollection
,DataFrameMixin
,ColumnMixin
Entity based DataCollection.
Examples
>>> from towhee import Entity >>> DataFrame([Entity(id=a) for a in [1,2,3]]) [<Entity dict_keys(['id'])>, <Entity dict_keys(['id'])>, <Entity dict_keys(['id'])>]
Methods
api
Append item to data collection.
Convert elements into Entities.
Make the DataFrame as callable function
Convert entities to json
Convert entitis into raw python values
as_str
Create small batches from data collections.
Clear a DataCollection.
chunked map
compile_dag
Set the parameters in DC.
Copy a DataCollection.
Count an element in DataCollection.
Unbox Option values and drop Empty.
Drop entities that contain some specific values.
Making the data collection exception-safe by warp elements with Option.
Extend a DataCollection.
Unbox Option values and fill Empty with default values.
When DataFrame's iterable exists of Entities and some indexes missing, fill default value for those indexes.
Filter the DataCollection data based on function.
filter_data
Flatten nested data collections.
from_arrow_table
generate a file list with pattern
get_backend
get_chunksize
Return the config in DC, such as parallel, chunksize, jit and format_priority.
get_control_plane
get_executor
get_formate_priority
get_num_worker
Return the config in DC, such as parallel, chunksize, jit and format_priority.
Get the state storage for DataCollection
Get the first n lines of a DataCollection.
image_imshow
Insert data into a DataCollection.
jit_resolve
Apply a function across all values in a DataFrame.
Apply multiple unary_op to data collection.
netx
notify_consumed
Parse string to entities.
Set the parameters in DC.
Apply unary_op with parallel execution.
Extend a DataCollection.
random_sample
Generate DataCollection with range of values.
ray_resolve
Start the ray service.
read_audio
read images from a camera.
read_csv
read_json
Load video as a datacollection.
load files from url/path.
register_dag
remote
Remove element from DataCollection.
Rename an column in DataFrame.
Replace specific attributes with given vlues.
report the metric scores
resolve
Reverse a DataCollection.
Create rolling windows from data collections.
Iterate through the DataCollections data.
Shortcut for exception_safe
Sample the data collection.
Select data from dc with list(self).
Serve the DataFrame as a RESTful API
Set chunk size for arrow
Set evaluating mode for stateful operators
Set format priority.
set_jit
Set parallel execution for following calls.
Set the state storage for DataCollection
Set training mode for stateful operators
Print the first n lines of a DataCollection.
Shuffle an unstreamed data collection in place.
smap
Sort a DataCollection.
Split a dataframe into multiple dataframes.
Split DataCollection to train and test data.
Create a stream data collection.
Convert the iterables to column-based table.
Save dc as a csv file.
Turn a DataFrame into a DataCollection.
Turn a DataCollection into a DataFrame.
Convert DataCollection to list.
Encode a video with audio if provided.
Create a unstream data collection.
with_metrics
Combine two data collections.
Attributes
df
Check whether the data collection is stream or unstream.
Storage mode of the DataFrame.
select
- class ModeFlag(value)¶
Bases:
Flag
An enumeration.
- __add__(other) DataCollection ¶
Concat two DataCollections.
- Parameters:
other (DataCollection) – The DataCollection being appended to the calling DataFrame.
- Returns:
A new DataCollection of the concated DataCollections.
- Return type:
Examples
>>> dc0 = DataCollection.range(5) >>> dc1 = DataCollection.range(5) >>> dc2 = DataCollection.range(5) >>> (dc0 + dc1 + dc2) [0, 1, 2, 3, 4, 0, ...]
- __getattr__(name) DataCollection ¶
Unknown method dispatcher.
When a unknown method is invoked on a DataCollection object, the function call will be dispatched to a method resolver. By registering function to the resolver, you are able to extend DataCollection’s API at runtime without modifying its code.
- Parameters:
name (str) – The unkown attribute.
- Returns:
- Returns a new DataCollection for the output of attribute
call.
- Return type:
Examples
>>> from towhee import register >>> dc = DataCollection([1,2,3,4]) >>> @register(name='test/add1') ... def add1(x): ... return x+1 >>> dc.test.add1().to_list() [2, 3, 4, 5]
- __getitem__(index) any ¶
Index based access of element in DataCollection.
Access the element at the given index, similar to accessing list[at_index]. Does not work with streamed DataCollections.
- Parameters:
index (int) – The index location of the element being accessed.
- Raises:
TypeError – If function called on streamed DataCollection
- Returns:
The object at index.
- Return type:
any
Examples
Usage with non-streamed:
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc[2] 2
Usage with streamed:
>>> dc.stream()[1] Traceback (most recent call last): TypeError: indexing is only supported for DataCollection created from list or pandas DataFrame.
- __init__(iterable: Optional[Iterable] = None, **kws) None [source]¶
Initializes a new DataFrame instance.
- Parameters:
iterable (Iterable, optional) – The data to be encapsualted by the DataFrame. Defaults to None.
- __repr__() str ¶
String representation of the DataCollection
- Returns:
String repersentation of the DataCollection.
- Return type:
str
Examples
Usage with non-streamed:
>>> DataCollection([1, 2, 3]).unstream() [1, 2, 3]
Usage with streamed:
>>> DataCollection([1, 2, 3]).stream() <list_iterator object at...>
- __setitem__(index, value)¶
Index based setting of element in DataCollection.
Assign the value of the element at the given index, similar to list[at_index]=val. Does not work with streamed DataCollections.
- Parameters:
index (int) – The index location of the element being set.
val (any) – The value to be set.
- Raises:
TypeError – If function called on streamed DataCollection
Examples
Usage with non-streamed:
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc[2] = 3 >>> dc.to_list() [0, 1, 3, 3, 4]
Usage with streamed:
>>> dc.stream()[1] Traceback (most recent call last): TypeError: indexing is only supported for DataCollection created from list or pandas DataFrame.
- append(*args) DataCollection ¶
Append item to data collection.
- Parameters:
item (Any) – the item to append
- Returns:
self
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2]) >>> dc.append(3).append(4) [0, 1, 2, 3, 4]
- as_entity(schema: Optional[List[str]] = None)¶
Convert elements into Entities.
- Parameters:
schema (Optional[List[str]]) – schema contains field names.
Examples: 1. convert dicts into entities:
>>> from towhee import DataFrame >>> ( ... DataFrame([dict(a=1, b=2), dict(a=2, b=3)]) ... .as_entity() ... .as_str() ... .to_list() ... ) ["{'a': 1, 'b': 2}", "{'a': 2, 'b': 3}"]
convert tuples into entities:
>>> from towhee import DataFrame >>> ( ... DataFrame([(1, 2), (2, 3)]) ... .as_entity(schema=['a', 'b']) ... .as_str() ... .to_list() ... ) ["{'a': 1, 'b': 2}", "{'a': 2, 'b': 3}"]
convert single value into entities:
>>> from towhee import DataFrame >>> ( ... DataFrame([1, 2]) ... .as_entity(schema=['a']) ... .as_str() ... .to_list() ... ) ["{'a': 1}", "{'a': 2}"]
- as_function()¶
Make the DataFrame as callable function
- Returns:
a callable function
- Return type:
_type_
Examples:
>>> import towhee >>> with towhee.api() as api: ... func1 = ( ... api.map(lambda x: x+' -> 1') ... .map(lambda x: x+' => 1') ... .as_function() ... )
>>> with towhee.api['x']() as api: ... func2 = ( ... api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2') ... .select['y']() ... .as_raw() ... .as_function() ... )
>>> with towhee.api() as api: ... func3 = ( ... api.parse_json() ... .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3') ... .select['y']() ... .as_json() ... .as_function() ... )
>>> func1('1') '1 -> 1 => 1' >>> func2('2') '2 -> 2 => 2' >>> func3('{"x": "3"}') '{"y": "3 -> 3 => 3"}'
- as_json()¶
Convert entities to json
Examples:
>>> from towhee import DataFrame, Entity >>> ( ... DataFrame([Entity(x=1)]) ... .as_json() ... ) ['{"x": 1}']
- as_raw()¶
Convert entitis into raw python values
Examples:
unpack multiple values from entities:
>>> from towhee import DataFrame >>> ( ... DataFrame([(1, 2), (2, 3)]) ... .as_entity(schema=['a', 'b']) ... .as_raw() ... .to_list() ... ) [(1, 2), (2, 3)]
unpack single value from entities:
>>> ( ... DataFrame([1, 2]) ... .as_entity(schema=['a']) ... .as_raw() ... .to_list() ... ) [1, 2]
- batch(size, drop_tail=False, raw=True)¶
Create small batches from data collections.
- Parameters:
size (int) – Window size;
drop_tail (bool) – Drop tailing windows that not full, defaults to False;
raw (bool) – Whether to return raw data instead of DataCollection, defaults to True
- Returns:
DataCollection of batched windows or batch raw data
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection(range(10)) >>> [list(batch) for batch in dc.batch(2, raw=False)] [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3, drop_tail=True) [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity >>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])]) >>> dc.batch(2) [<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
- clear(*args) DataCollection ¶
Clear a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.clear() []
- cmap(unary_op)¶
chunked map
Examples:
>>> import towhee >>> dc = towhee.dc['a'](range(10)) >>> dc = dc.to_column() >>> dc = dc.runas_op['a', 'b'](func=lambda x: x+1) >>> dc.show(limit=5, tablefmt='plain') a b 0 1 1 2 2 3 3 4 4 5 >>> dc._iterable pyarrow.Table a: int64 b: int64 ---- a: [[0,1,2,3,4,5,6,7,8,9]] b: [[1,2,3,4,5,6,7,8,9,10]] >>> len(dc._iterable) 10
- config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)¶
Set the parameters in DC.
- Parameters:
parallel (int) – Set the number of parallel execution for following calls.
chunksize (int) – Set the chunk size for arrow.
jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.
format_priority (List[str]) – The priority list of format.
- copy(*args) DataCollection ¶
Copy a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc_1 = dc.copy() >>> dc_1._iterable.append(4) >>> dc, dc_1 ([1, 2, 3], [1, 2, 3, 4])
- count(*args) int ¶
Count an element in DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.count(1) 1
- drop_empty(callback: Callable = None) DataCollection ¶
Unbox Option values and drop Empty.
- Parameters:
callback (Callable) – handler for empty values;
- Returns:
data collection that drops empty values;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list() [0.0, 0.5, 1.0, 2.0]
Get inputs that case exceptions:
>>> exception_inputs = [] >>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value)) >>> exception_inputs [3]
- dropna(na: Set[str] = {'', None}) Union[bool, DataFrame] ¶
Drop entities that contain some specific values.
- Parameters:
na (Set[str]) – Those entities contain values in na will be dropped.
Examples:
>>> from towhee import Entity, DataFrame >>> entities = [Entity(a=i, b=i + 1) for i in range(3)] >>> entities.append(Entity(a=3, b='')) >>> df = DataFrame(entities) >>> df [<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
>>> df.dropna() [<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
- exception_safe()¶
Making the data collection exception-safe by warp elements with Option.
Examples:
Exception breaks pipeline execution:
>>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list() Traceback (most recent call last): ZeroDivisionError: division by zero
Exception-safe execution
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty()]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list() [Some(0.0), Some(0.5), Some(1.0)]
- extend(*args) DataCollection ¶
Extend a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.extend([4, 5]) [1, 2, 3, 4, 5]
- fill_empty(default: Any = None) DataCollection ¶
Unbox Option values and fill Empty with default values.
- Parameters:
default (Any) – default value to replace empty values;
- Returns:
data collection with empty values filled with default;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list() [0.0, 0.5, 1.0, -1.0, 2.0]
- fill_entity(_DefaultKVs: Optional[Dict[str, Any]] = None, _ReplaceNoneValue: bool = False, **kws)¶
When DataFrame’s iterable exists of Entities and some indexes missing, fill default value for those indexes.
- Parameters:
_ReplaceNoneValue (bool) – Whether to replace None in Entity’s value.
_DefaultKVs (Dict[str, Any]) – The key-value pairs stored in a dict.
Examples:
>>> from towhee import Entity, DataFrame >>> entities = [Entity(num=i) for i in range(3)] >>> df = DataFrame(entities) >>> df [<Entity dict_keys(['num'])>, <Entity dict_keys(['num'])>, <Entity dict_keys(['num'])>]
>>> kvs = {'foo': 'bar'} >>> df.fill_entity(kvs).fill_entity(usage='test').to_list() [<Entity dict_keys(['num', 'foo', 'usage'])>, <Entity dict_keys(['num', 'foo', 'usage'])>, <Entity dict_keys(['num', 'foo', 'usage'])>]
>>> kvs = {'FOO': None} >>> df.fill_entity(_ReplaceNoneValue=True, _DefaultKVs=kvs).to_list()[0].FOO 0
- filter(unary_op: Callable, drop_empty=False) DataCollection ¶
Filter the DataCollection data based on function.
Filters the DataCollection based on the function provided. If data is stored as an Option (see towhee.functional.option.py), drop empty will decide whether to remove the element or set it to empty.
- Parameters:
unary_op (Callable) – Function that dictates filtering.
drop_empty (bool, optional) – Whether to drop empty fields. Defaults to False.
- Returns:
Resulting DataCollection after filter.
- Return type:
- flatten() DataCollection ¶
Flatten nested data collections.
- Returns:
flattened data collection;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection(range(10)) >>> nested_dc = dc.batch(2) >>> nested_dc.flatten().to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- classmethod from_glob(*args)¶
generate a file list with pattern
- get_config()¶
Return the config in DC, such as parallel, chunksize, jit and format_priority.
- get_pipeline_config()¶
Return the config in DC, such as parallel, chunksize, jit and format_priority.
- get_state()¶
Get the state storage for DataCollection
- Returns:
the state storage
- Return type:
State
- head(n: int = 5)¶
Get the first n lines of a DataCollection.
- Parameters:
n (int) – The number of lines to print. Default value is 5.
Examples:
>>> from towhee import DataCollection >>> DataCollection.range(10).head(3).to_list() [0, 1, 2]
- insert(*args) DataCollection ¶
Insert data into a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.insert(0, 0) [0, 1, 2, 3]
- property is_stream¶
Check whether the data collection is stream or unstream.
Examples:
>>> from towhee import DataCollection >>> from typing import Iterable >>> dc = DataCollection([0,1,2,3,4]) >>> dc.is_stream False
>>> result = dc.map(lambda x: x+1) >>> result.is_stream False >>> result._iterable [1, 2, 3, 4, 5]
>>> dc = DataCollection(iter(range(5))) >>> dc.is_stream True
>>> result = dc.map(lambda x: x+1) >>> result.is_stream True >>> isinstance(result._iterable, Iterable) True
- map(*arg) DataFrame [source]¶
Apply a function across all values in a DataFrame.
- Parameters:
*arg (Callable) – One function to apply to the DataFrame.
- Returns:
New DataFrame containing computation results.
- Return type:
- mmap(ops: list, num_worker=None, backend=None)¶
Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
# TODO: the test is broken with pytest # Examples:
# 1. Using mmap:
# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]
# 2. Using map instead of mmap:
# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]
# 3. DAG execution:
# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]
- property mode¶
Storage mode of the DataFrame.
Return the storage mode of the DataFrame.
- Returns:
The storage format of the Dataframe.
- Return type:
Examples
>>> from towhee import Entity, DataFrame >>> e = [Entity(a=a, b=b) for a,b in zip(range(5), range(5))] >>> df = DataFrame(e) >>> df.mode <ModeFlag.ROWBASEDFLAG: 1>
>>> df = df.to_column() >>> df.mode <ModeFlag.COLBASEDFLAG: 2>
- parse_json()¶
Parse string to entities.
Examples:
>>> from towhee import DataFrame >>> df = ( ... DataFrame(['{"x": 1}']) ... .parse_json() ... ) >>> df[0].x 1
- pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)¶
Set the parameters in DC.
- Parameters:
parallel (int) – Set the number of parallel execution for following calls.
chunksize (int) – Set the chunk size for arrow.
jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.
format_priority (List[str]) – The priority list of format.
- pmap(unary_op, num_worker=None, backend=None)¶
Apply unary_op with parallel execution. Currently supports two backends, ray and thread.
- Parameters:
unary_op (func) – the op to be mapped;
num_worker (int) – how many threads to reserve for this op;
backend (str) – whether to use ray or thread
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = {threading.current_thread().ident} >>> stage_2_thread_set = {threading.current_thread().ident} >>> result = ( ... DataCollection.range(1000).stream() ... .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5) ... .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list() ... ) >>> len(stage_1_thread_set) > 1 True >>> len(stage_2_thread_set) > 1 True
- pop(*args) DataCollection ¶
Extend a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.pop() [1, 2]
- static range(*arg, **kws) DataCollection ¶
Generate DataCollection with range of values.
Generate DataCollection with a range of numbers as the data. Functions in same way as Python range() function.
- Returns:
Returns a new DataCollection.
- Return type:
Examples
>>> DataCollection.range(5).to_list() [0, 1, 2, 3, 4]
- ray_start(address=None, local_packages: Optional[list] = None, pip_packages: Optional[list] = None, silence=True)¶
Start the ray service. When using a remote cluster, all dependencies for custom functions and operators defined locally will need to be sent to the ray cluster. If using ray locally, within the runtime, avoid passing in any arguments.
- Parameters:
address (str) – The address for the ray service being connected to. If using ray cluster remotely with kubectl forwarded port, the most likely address will be “ray://localhost:10001”.
local_packages (list[str]) – Whichever locally defined modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.
pip_packages (list[str]) – Whichever pip installed modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.
- classmethod read_camera(device_id=0, limit=-1)¶
read images from a camera.
- classmethod read_video(path, format='rgb24')¶
Load video as a datacollection.
- Parameters:
path – The path to the target video.
format – The format of the images loaded from video.
- classmethod read_zip(url, pattern, mode='r')¶
load files from url/path.
- Parameters:
zip_src (Union[str, path]) – The path leads to the image.
pattern (str) – The filename pattern to extract.
mode (str) – file open mode.
- Returns:
The file handler for file in the zip file.
- Return type:
(File)
- remove(*args) DataCollection ¶
Remove element from DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.remove(1) [2, 3]
- rename(column: Dict[str, str])¶
Rename an column in DataFrame.
- Parameters:
column (Dict[str, str]) – The columns to rename and their corresponding new name.
Examples:
>>> from towhee import Entity, DataFrame >>> entities = [Entity(a=i, b=i + 1) for i in range(3)] >>> df = DataFrame(entities) >>> df [<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
>>> df.rename(column={'a': 'A', 'b': 'B'}) [<Entity dict_keys(['A', 'B'])>, <Entity dict_keys(['A', 'B'])>, <Entity dict_keys(['A', 'B'])>]
- replace(**kws)¶
Replace specific attributes with given vlues.
Examples:
>>> from towhee import Entity, DataFrame
>>> entities = [Entity(num=i) for i in range(5)] >>> df = DataFrame(entities) >>> [i.num for i in df] [0, 1, 2, 3, 4]
>>> df = df.replace(num={0: 1, 1: 2, 2: 3, 3: 4, 4: 5}) >>> [i.num for i in df] [1, 2, 3, 4, 5]
- report()¶
report the metric scores
Examples:
>>> from towhee import DataCollection >>> from towhee import Entity >>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])]) >>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() accuracy recall lr 1.0 1.0 rf 0.8 0.8 {'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])]) >>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report() mean_average_precision mean_hit_ratio test 0.622222 1.0 {'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
- reverse(*args) DataCollection ¶
Reverse a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 2, 3]) >>> dc.reverse() [3, 2, 1]
- rolling(size: int, drop_head=True, drop_tail=True)¶
Create rolling windows from data collections.
- Parameters:
size (int) – Wndow size.
drop_head (bool) – Drop headding windows that not full.
drop_tail (bool) – Drop tailing windows that not full.
- Returns:
data collection of rolling windows;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3)] [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_head=False)] [[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_tail=False)] [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
- run()¶
Iterate through the DataCollections data.
Stream-based DataCollections will not run if the data is not a datasink. This function is a datasink that consumes the data without any operations.
- safe()¶
Shortcut for exception_safe
- sample(ratio=1.0) DataCollection ¶
Sample the data collection.
- Parameters:
ratio (float) – sample ratio;
- Returns:
sampled data collection;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection(range(10000)) >>> result = dc.sample(0.1) >>> ratio = len(result.to_list()) / 10000. >>> 0.09 < ratio < 0.11 True
- select_from(other)¶
Select data from dc with list(self).
Examples:
>>> from towhee import DataCollection >>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2]) >>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1) >>> list(dc3) [[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
- serve(path='/', app=None)¶
Serve the DataFrame as a RESTful API
- Parameters:
path (str, optional) – API path. Defaults to ‘/’.
app (_type_, optional) – The FastAPI app the API bind to, will create one if None.
- Returns:
the app that bind to
- Return type:
_type_
Examples:
>>> from fastapi import FastAPI >>> from fastapi.testclient import TestClient >>> app = FastAPI()
>>> import towhee >>> with towhee.api() as api: ... app1 = ( ... api.map(lambda x: x+' -> 1') ... .map(lambda x: x+' => 1') ... .serve('/app1', app) ... )
>>> with towhee.api['x']() as api: ... app2 = ( ... api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2') ... .select['y']() ... .serve('/app2', app) ... )
>>> with towhee.api() as api: ... app2 = ( ... api.parse_json() ... .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3') ... .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3') ... .select['y']() ... .serve('/app3', app) ... )
>>> client = TestClient(app) >>> client.post('/app1', '1').text '"1 -> 1 => 1"' >>> client.post('/app2', '2').text '{"y":"2 -> 2 => 2"}' >>> client.post('/app3', '{"x": "3"}').text '{"y":"3 -> 3 => 3"}'
- set_chunksize(chunksize)¶
Set chunk size for arrow
Examples:
>>> import towhee >>> dc_1 = towhee.dc['a'](range(20)) >>> dc_1 = dc_1.set_chunksize(10) >>> dc_2 = dc_1.runas_op['a', 'b'](func=lambda x: x+1) >>> dc_1.get_chunksize(), dc_2.get_chunksize() (10, 10) >>> dc_2._iterable.chunks() [pyarrow.Table a: int64 b: int64 ---- a: [[0,1,2,3,4,5,6,7,8,9]] b: [[1,2,3,4,5,6,7,8,9,10]], pyarrow.Table a: int64 b: int64 ---- a: [[10,11,12,13,14,15,16,17,18,19]] b: [[11,12,13,14,15,16,17,18,19,20]]]
>>> dc_3 = towhee.dc['a'](range(20)).stream() >>> dc_3 = dc_3.set_chunksize(10) >>> dc_4 = dc_3.runas_op['a', 'b'](func=lambda x: x+1) >>> dc_4._iterable.chunks() [pyarrow.Table a: int64 b: int64 ---- a: [[0,1,2,3,4,5,6,7,8,9]] b: [[1,2,3,4,5,6,7,8,9,10]], pyarrow.Table a: int64 b: int64 ---- a: [[10,11,12,13,14,15,16,17,18,19]] b: [[11,12,13,14,15,16,17,18,19,20]]]
- set_evaluating(state=None)¶
Set evaluating mode for stateful operators
- Parameters:
state (State, optional) – Update the state storage. Defaults to None.
- Returns:
data collection itself
- Return type:
- set_format_priority(format_priority: List[str])¶
Set format priority.
- Parameters:
format_priority (List[str]) – The priority queue of format.
- set_parallel(num_worker=2, backend='thread')¶
Set parallel execution for following calls.
Examples:
>>> from towhee import DataCollection >>> import threading >>> stage_1_thread_set = set() >>> stage_2_thread_set = set() >>> result = ( ... DataCollection.range(1000).stream().set_parallel(4) ... .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident)) ... .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list() ... )
>>> len(stage_2_thread_set)>1 True
- set_state(state)¶
Set the state storage for DataCollection
- Parameters:
state (State) – state storage
- Returns:
data collection itself
- Return type:
- set_training(state=None)¶
Set training mode for stateful operators
- Parameters:
state (State, optional) – Update the state storage. Defaults to None.
- Returns:
data collection itself
- Return type:
- show(limit=5, header=None, tablefmt='html', formatter={})¶
Print the first n lines of a DataCollection.
- Parameters:
limit (int) – The number of lines to print. Default value is 5. Print all if limit is non-positive.
header (list of str) – The field names.
tablefmt (str) – The format of the output, support html, plain, grid.
- shuffle() DataCollection ¶
Shuffle an unstreamed data collection in place.
- Returns:
shuffled data collection;
- Return type:
Examples:
Shuffle:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> a = dc.shuffle() >>> tuple(a) == tuple(range(5)) False
streamed data collection is not supported:
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> _ = dc.shuffle() Traceback (most recent call last): TypeError: shuffle is not supported for streamed data collection.
- sort(*args) DataCollection ¶
Sort a DataCollection.
Examples:
>>> from towhee import DataCollection >>> dc = DataCollection([1, 4, 3]) >>> dc.sort() [1, 3, 4]
- split(count)¶
Split a dataframe into multiple dataframes.
- Parameters:
count (int) – how many resulting DCs;
- Returns:
copies of DC;
- Return type:
[DataCollection, …]
Examples:
Split:
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> a, b, c = dc.split(3) >>> a.zip(b, c).to_list() [(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
- split_train_test(size: list = [0.9, 0.1], **kws)¶
Split DataCollection to train and test data.
- Parameters:
size (list) – The size of the train and test.
Examples:
>>> from towhee.functional import DataCollection >>> dc = DataCollection.range(10) >>> train, test = dc.split_train_test(shuffle=False) >>> train.to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8] >>> test.to_list() [9]
- stream()¶
Create a stream data collection.
Examples: 1. Convert a data collection to streamed version
>>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc.is_stream False
>>> dc = dc.stream() >>> dc.is_stream True
- to_column()¶
Convert the iterables to column-based table.
Examples:
>>> from towhee import Entity, DataFrame >>> e = [Entity(a=a, b=b) for a,b in zip(['abc', 'def', 'ghi'], [1,2,3])] >>> df = DataFrame(e) >>> df [<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>] >>> df.to_column() pyarrow.Table a: string b: int64 ---- a: [["abc","def","ghi"]] b: [[1,2,3]]
- to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')¶
Save dc as a csv file.
- Parameters:
csv_path (Union[str, Path]) – The path to save the dc to.
encoding (str) – The encoding to use in the output file.
- to_dc() DataCollection [source]¶
Turn a DataFrame into a DataCollection.
- Returns:
Resulting DataCollection from DataFrame
- Return type:
Examples
>>> from towhee import DataFrame, Entity >>> e = [Entity(a=a, b=b) for a,b in zip(['abc', 'def', 'ghi'], [1,2,3])] >>> df = DataFrame(e) >>> type(df) <class 'towhee.functional.data_collection.DataFrame'>
>>> type(df.to_dc()) <class 'towhee.functional.data_collection.DataCollection'>
- to_df() DataFrame ¶
Turn a DataCollection into a DataFrame.
- Returns:
Resulting converted DataFrame.
- Return type:
Examples
>>> from towhee import DataCollection, Entity >>> e = [Entity(a=a, b=b) for a,b in zip(['abc', 'def', 'ghi'], [1,2,3])] >>> dc = DataCollection(e) >>> type(dc) <class 'towhee.functional.data_collection.DataCollection'>
>>> type(dc.to_df()) <class 'towhee.functional.data_collection.DataFrame'>
- to_list() list ¶
Convert DataCollection to list.
- Returns:
List of values stored in DataCollection.
- Return type:
list
Examples
>>> DataCollection.range(5).to_list() [0, 1, 2, 3, 4]
- to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)¶
Encode a video with audio if provided.
- Parameters:
output_path – The path of the output video.
codec – The codec to encode and decode the video.
rate – The rate of the video.
width – The width of the video.
height – The height of the video.
format – The format of the video frame image.
template – The template video stream of the ouput video stream.
audio_src – The audio to encode with the video.
- unstream()¶
Create a unstream data collection.
Examples:
Create a unstream data collection
>>> from towhee import DataCollection >>> dc = DataCollection(iter(range(5))).unstream() >>> dc.is_stream False
Convert a streamed data collection to unstream version
>>> dc = DataCollection(iter(range(5))) >>> dc.is_stream True >>> dc = dc.unstream() >>> dc.is_stream False
- zip(*others) DataCollection ¶
Combine two data collections.
- Parameters:
*others (DataCollection) – other data collections;
- Returns:
data collection with zipped values;
- Return type:
Examples:
>>> from towhee import DataCollection >>> dc1 = DataCollection([1,2,3,4]) >>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1) >>> dc3 = dc1.zip(dc2) >>> list(dc3) [(1, 2), (2, 3), (3, 4), (4, 5)]