towhee.functional.data_collection.DataFrame

class towhee.functional.data_collection.DataFrame(iterable: Optional[Iterable] = None, **kws)[source]

Bases: DataCollection, DataFrameMixin, ColumnMixin

Entity based DataCollection.

Examples

>>> from towhee import Entity
>>> DataFrame([Entity(id=a) for a in [1,2,3]])
[<Entity dict_keys(['id'])>, <Entity dict_keys(['id'])>, <Entity dict_keys(['id'])>]

Methods

api

append

Append item to data collection.

as_entity

Convert elements into Entities.

as_function

Make the DataFrame as callable function

as_json

Convert entities to json

as_raw

Convert entitis into raw python values

as_str

batch

Create batches from the DataCollection.

clear

Clear a DataCollection.

cmap

Chunked map.

combine

Combine dataframes to be able to access schemas from seperate DF chains.

compile_dag

Compile the dag.

config

Set the parameters for the DC.

copy

Copy a DataCollection.

count

Count an element in DataCollection.

create_arrow_table

Convert kwargs to Table.

drop_empty

Unbox Option values and drop Empty.

dropna

Drop entities that contain some specific values.

exception_safe

Making the data collection exception-safe by warp elements with Option.

extend

Extend a DataCollection.

fill_empty

Unbox Option values and fill Empty with default values.

fill_entity

When DataFrame's iterable exists of Entities and some indexes missing, fill default value for those indexes.

filter

Filter the DataCollection data based on function.

flatten

Flatten nested data within DataCollection.

from_df

from_glob

Generate a file list with pattern.

get_backend

get_chunksize

get_config

Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.

get_executor

get_formate_priority

get_num_worker

get_pipeline_config

Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.

get_state

Get the state storage for DataCollection

group_by

Merge columns in DataCollection.

head

Return the first n values of a DataCollection.

image_imshow

Produce a CV2 imshow window.

insert

Insert data into a DataCollection.

jit_resolve

map

Apply a function across all values in a DataFrame.

mmap

Apply multiple unary_op to data collection.

netx

Show dags' relations.

notify_consumed

Notfify that a DataCollection was consumed.

parse_json

Parse string to entities.

pipeline_config

Set the parameters in DC.

pmap

Apply unary_op with parallel execution.

pop

Extend a DataCollection.

range

Generate DataCollection with range of values.

ray_resolve

ray_start

Start the ray service.

read_audio

read_camera

Read images from a camera.

read_csv

read_json

read_video

Load a video as a DataCollection.

read_zip

Load files from url/path.

register_dag

Function that can be called within the function trying to be added to dag.

remove

Remove element from DataCollection.

rename

Rename an column in DataFrame.

replace

Replace specific attributes with given vlues.

report

Report the metric scores, and if you are using 'confusion matrix' metric, please use jupyter to display the matrix.

resolve

Dispatch unknown operators.

reverse

Reverse a DataCollection.

rolling

Create rolling windows from DataCollection.

run

Iterate through the DataCollections data.

safe

Shortcut for exception_safe.

sample

Sample the data collection.

select_from

Select data from dc with list(self).

serve

Serve the DataFrame as a RESTful API

set_chunksize

Set chunk size for arrow

set_evaluating

Set evaluating mode for stateful operators

set_format_priority

Set format priority.

set_jit

set_parallel

Set parallel execution for following calls.

set_state

Set the state storage for DataCollection

set_training

Set training mode for stateful operators

show

Print the first n lines of a DataCollection.

shuffle

Shuffle an unstreamed data collection in place.

smap

sort

Sort a DataCollection.

split

Split a dataframe into multiple dataframes.

split_train_test

Split DataCollection to train and test data.

stream

Create a stream data collection.

to_column

Convert the DataCollection to column-based table DataCollection.

to_csv

Save dc as a csv file.

to_dc

Turn a DataFrame into a DataCollection.

to_df

Turn a DataCollection into a DataFrame.

to_list

Convert DataCollection to list.

to_video

Encode a video; with audio if provided.

unstream

Create a unstream data collection.

with_metrics

zip

Combine multiple data collections.

Attributes

control_plane

df

is_stream

Check whether the data collection is stream or unstream.

mode

Storage mode of the DataFrame.

select

Select columns from a DC.

class ModeFlag(value)

Bases: Flag

An enumeration.

__add__(other) DataCollection

Concat two DataCollections.

Parameters:

other (DataCollection) – The DataCollection being appended to the calling DataFrame.

Returns:

A new DataCollection of the concated DataCollections.

Return type:

DataCollection

Examples

>>> dc0 = DataCollection.range(5)
>>> dc1 = DataCollection.range(5)
>>> dc2 = DataCollection.range(5)
>>> (dc0 + dc1 + dc2)
[0, 1, 2, 3, 4, 0, ...]
__getattr__(name) DataCollection

Unknown method dispatcher.

When an unknown method is invoked on a DataCollection object, the function call will be dispatched to a method resolver. By registering function to the resolver, you are able to extend DataCollection’s API at runtime without modifying its code.

Parameters:

name (str) – The unknown attribute.

Returns:

Returns a new DataCollection for the output of attribute

call.

Return type:

DataCollection

Examples

>>> from towhee import register
>>> dc = DataCollection([1,2,3,4])
>>> @register(name='test/add1')
... def add1(x):
...     return x+1
>>> dc.test.add1().to_list()
[2, 3, 4, 5]
__getitem__(index) any

Index based access of element in DataCollection.

Access the element at the given index, similar to accessing list[at_index]. Does not work with streamed DataCollections.

Parameters:

index (int) – The index location of the element being accessed.

Raises:

TypeError – If function called on streamed DataCollection

Returns:

The object at index.

Return type:

any

Examples

  1. Usage with non-streamed:

    >>> dc = DataCollection([0, 1, 2, 3, 4])
    >>> dc[2]
    2
    
  2. Usage with streamed:

    >>> dc.stream()[1] 
    Traceback (most recent call last):
    TypeError: indexing is only supported for DataCollection created from list
        or pandas DataFrame.
    
__init__(iterable: Optional[Iterable] = None, **kws) None[source]

Initializes a new DataFrame instance.

Parameters:

iterable (Iterable, optional) – The data to be encapsualted by the DataFrame. Defaults to None.

__repr__() str

String representation of the DataCollection

Returns:

String representation of the DataCollection.

Return type:

str

Examples

  1. Usage with non-streamed:

    >>> DataCollection([1, 2, 3]).unstream()
    [1, 2, 3]
    
  2. Usage with streamed:

    >>> DataCollection([1, 2, 3]).stream() 
    <list_iterator object at...>
    
__setitem__(index, value)

Index based setting of element in DataCollection.

Assign the value of the element at the given index, similar to list[at_index]=val. Does not work with streamed DataCollections.

Parameters:
  • index (int) – The index location of the element being set.

  • val (any) – The value to be set.

Raises:

TypeError – If function called on streamed DataCollection

Examples

  1. Usage with non-streamed:

    >>> dc = DataCollection([0, 1, 2, 3, 4])
    >>> dc[2] = 3
    >>> dc.to_list()
    [0, 1, 3, 3, 4]
    
  2. Usage with streamed:

    >>> dc.stream()[1] 
    Traceback (most recent call last):
    TypeError: indexing is only supported for DataCollection created from list
        or pandas DataFrame.
    
append(*args) DataCollection

Append item to data collection.

Parameters:

item (Any) – the item to append

Returns:

self

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2])
>>> dc.append(3).append(4)
[0, 1, 2, 3, 4]
as_entity(schema: Optional[List[str]] = None)

Convert elements into Entities.

Parameters:

schema (Optional[List[str]]) – schema contains field names.

Examples: 1. convert dicts into entities:

>>> from towhee import DataFrame
>>> (
...     DataFrame([dict(a=1, b=2), dict(a=2, b=3)])
...         .as_entity()
...         .as_str()
...         .to_list()
... )
["{'a': 1, 'b': 2}", "{'a': 2, 'b': 3}"]
  1. convert tuples into entities:

>>> from towhee import DataFrame
>>> (
...     DataFrame([(1, 2), (2, 3)])
...         .as_entity(schema=['a', 'b'])
...         .as_str()
...         .to_list()
... )
["{'a': 1, 'b': 2}", "{'a': 2, 'b': 3}"]
  1. convert single value into entities:

>>> from towhee import DataFrame
>>> (
...     DataFrame([1, 2])
...         .as_entity(schema=['a'])
...         .as_str()
...         .to_list()
... )
["{'a': 1}", "{'a': 2}"]
as_function()

Make the DataFrame as callable function

Returns:

a callable function

Return type:

_type_

Examples:

>>> import towhee
>>> with towhee.api() as api:
...     func1 = (
...         api.map(lambda x: x+' -> 1')
...            .map(lambda x: x+' => 1')
...            .as_function()
...     )
>>> with towhee.api['x']() as api:
...     func2 = (
...         api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2')
...            .select['y']()
...            .as_raw()
...            .as_function()
...     )
>>> with towhee.api() as api:
...     func3 = (
...         api.parse_json()
...            .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3')
...            .select['y']()
...            .as_json()
...            .as_function()
...     )
>>> func1('1')
'1 -> 1 => 1'
>>> func2('2')
'2 -> 2 => 2'
>>> func3('{"x": "3"}')
'{"y": "3 -> 3 => 3"}'
as_json()

Convert entities to json

Examples:

>>> from towhee import DataFrame, Entity
>>> (
...     DataFrame([Entity(x=1)])
...         .as_json()
... )
['{"x": 1}']
as_raw()

Convert entitis into raw python values

Examples:

  1. unpack multiple values from entities:

>>> from towhee import DataFrame
>>> (
...     DataFrame([(1, 2), (2, 3)])
...         .as_entity(schema=['a', 'b'])
...         .as_raw()
...         .to_list()
... )
[(1, 2), (2, 3)]
  1. unpack single value from entities:

>>> (
...     DataFrame([1, 2])
...         .as_entity(schema=['a'])
...         .as_raw()
...         .to_list()
... )
[1, 2]
batch(size, drop_tail=False)

Create batches from the DataCollection.

Parameters:
  • size (int) – Window size.

  • drop_tail (bool) – Drop trailing window that is not full, defaults to False.

Returns:

Batched DataCollection.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10))
>>> [list(batch) for batch in dc.batch(2)]
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3, drop_tail=True)
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity
>>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])])
>>> dc.batch(2)
[[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>], [<Entity dict_keys(['a', 'b'])>]]
clear(*args) DataCollection

Clear a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.clear()
[]
cmap(unary_op)

Chunked map.

Parameters:

unary_op (callable) – The operation to map.

Returns:

A new DataCollection after mapping.

Return type:

DataCollection

Examples

>>> import towhee
>>> dc = towhee.dc['a'](range(10))
>>> dc = dc.to_column()
>>> dc = dc.runas_op['a', 'b'](func=lambda x: x+1)
>>> dc.show(limit=5, tablefmt='plain')
  a    b
  0    1
  1    2
  2    3
  3    4
  4    5
>>> dc._iterable
pyarrow.Table
a: int64
b: int64
----
a: [[0,1,2,3,4,5,6,7,8,9]]
b: [[1,2,3,4,5,6,7,8,9,10]]
>>> len(dc._iterable)
10
classmethod combine(*datacollections)

Combine dataframes to be able to access schemas from seperate DF chains.

Parameters:

datacollections (DataFrame) – DataFrames to combine.

Examples

>>> import towhee
>>> a = towhee.range['a'](1,5)
>>> b = towhee.range['b'](5,10)
>>> c = towhee.range['c'](10, 15)
>>> z = towhee.DataFrame.combine(a, b, c)
>>> z.as_raw().to_list()
[(1, 5, 10), (2, 6, 11), (3, 7, 12), (4, 8, 13)]
compile_dag()

Compile the dag.

Runs a schema of commands that removes unecessary steps and cleans the DAG.

Returns:

The compiled DAG.

Return type:

dict

config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)

Set the parameters for the DC.

Parameters:
  • parallel (int, optional) – Set the number of parallel execution for the following calls, defaults to None.

  • chunksize (int, optional) – Set the chunk size for arrow, defaults to None.

  • jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.

  • format_priority (List[str], optional) – The priority list of formats, defaults to None.

Returns:

Self.

Return type:

DataCollection

copy(*args) DataCollection

Copy a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc_1 = dc.copy()
>>> dc_1._iterable.append(4)
>>> dc, dc_1
([1, 2, 3], [1, 2, 3, 4])
count(*args) int

Count an element in DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.count(1)
1
classmethod create_arrow_table(**kws)

Convert kwargs to Table.

Returns:

The Table from the kwargs.

Return type:

pyarrow.Table

drop_empty(callback: Callable = None) DataCollection

Unbox Option values and drop Empty.

Parameters:

callback (Callable) – handler for empty values;

Returns:

A DataCollection that drops empty values;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection.range(5)
>>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list()
[0.0, 0.5, 1.0, 2.0]

Get inputs that case exceptions:

>>> exception_inputs = []
>>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value))
>>> exception_inputs
[3]
dropna(na: Set[str] = {'', None}) Union[bool, DataFrame]

Drop entities that contain some specific values.

Parameters:

na (Set[str]) – Those entities contain values in na will be dropped.

Examples:

>>> from towhee import Entity, DataFrame
>>> entities = [Entity(a=i, b=i + 1) for i in range(3)]
>>> entities.append(Entity(a=3, b=''))
>>> df = DataFrame(entities)
>>> df
[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
>>> df.dropna()
[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
exception_safe()

Making the data collection exception-safe by warp elements with Option.

Examples:

  1. Exception breaks pipeline execution:

>>> from towhee import DataCollection
>>> dc = DataCollection.range(5)
>>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list()
Traceback (most recent call last):
ZeroDivisionError: division by zero
  1. Exception-safe execution

>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list()
[Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list()
[Some(0.0), Some(0.5), Some(1.0), Empty()]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list()
[Some(0.0), Some(0.5), Some(1.0)]
extend(*args) DataCollection

Extend a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.extend([4, 5])
[1, 2, 3, 4, 5]
fill_empty(default: Any = None) DataCollection

Unbox Option values and fill Empty with default values.

Parameters:

default (Any) – default value to replace empty values;

Returns:

data collection with empty values filled with default;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection.range(5)
>>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list()
[0.0, 0.5, 1.0, -1.0, 2.0]
fill_entity(_DefaultKVs: Optional[Dict[str, Any]] = None, _ReplaceNoneValue: bool = False, **kws)

When DataFrame’s iterable exists of Entities and some indexes missing, fill default value for those indexes.

Parameters:
  • _ReplaceNoneValue (bool) – Whether to replace None in Entity’s value.

  • _DefaultKVs (Dict[str, Any]) – The key-value pairs stored in a dict.

Examples:

>>> from towhee import Entity, DataFrame
>>> entities = [Entity(num=i) for i in range(3)]
>>> df = DataFrame(entities)
>>> df
[<Entity dict_keys(['num'])>, <Entity dict_keys(['num'])>, <Entity dict_keys(['num'])>]
>>> kvs = {'foo': 'bar'}
>>> df.fill_entity(kvs).fill_entity(usage='test').to_list()
[<Entity dict_keys(['num', 'foo', 'usage'])>, <Entity dict_keys(['num', 'foo', 'usage'])>, <Entity dict_keys(['num', 'foo', 'usage'])>]
>>> kvs = {'FOO': None}
>>> df.fill_entity(_ReplaceNoneValue=True, _DefaultKVs=kvs).to_list()[0].FOO
0
filter(unary_op: Callable, drop_empty=False) DataCollection

Filter the DataCollection data based on function.

Filters the DataCollection based on the function provided. If data is stored as an Option (see towhee.functional.option.py), drop empty will decide whether to remove the element or set it to empty.

Parameters:
  • unary_op (Callable) – Function that dictates filtering.

  • drop_empty (bool, optional) – Whether to drop empty fields. Defaults to False.

Returns:

Resulting DataCollection after filter.

Return type:

DataCollection

flatten(*args) DataCollection

Flatten nested data within DataCollection.

Returns:

Flattened DataCollection.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection, Entity
>>> dc = DataCollection(range(10))
>>> nested_dc = dc.batch(2)
>>> nested_dc.flatten().to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> g = (i for i in range(3))
>>> e = Entity(a=1, b=2, c=g)
>>> dc = DataCollection([e]).flatten('c')
>>> [str(i) for i in dc]
["{'a': 1, 'b': 2, 'c': 0}", "{'a': 1, 'b': 2, 'c': 1}", "{'a': 1, 'b': 2, 'c': 2}"]
classmethod from_glob(*args)

Generate a file list with pattern.

get_config()

Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.

Returns:

A dict of config parameters.

Return type:

dict

get_pipeline_config()

Return the config of the DC, including parameters such as parallel, chunksize, jit and format_priority.

Returns:

A dict of config parameters.

Return type:

dict

get_state()

Get the state storage for DataCollection

Returns:

the state storage

Return type:

State

group_by(index) DataCollection

Merge columns in DataCollection. Unstreamed data only.

Examples

>>> import towhee
>>> dc = towhee.dc['a']([1,1,2,2,3,3])
>>> [i.a for i in dc]
[1, 1, 2, 2, 3, 3]
>>> dc = dc.group_by('a')
>>> [i.a for i in dc]
[1, 2, 3]
head(n: int = 5)

Return the first n values of a DataCollection.

Parameters:

n (int, optional) – The amount to select, defaults to 5.

Returns:

DataCollection with the selected values.

Return type:

DataCollection

image_imshow(title='image')

Produce a CV2 imshow window.

Parameters:

title (str, optional) – The title for the image. Defaults to ‘image’.

insert(*args) DataCollection

Insert data into a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.insert(0, 0)
[0, 1, 2, 3]
property is_stream

Check whether the data collection is stream or unstream.

Examples:

>>> from towhee import DataCollection
>>> from typing import Iterable
>>> dc = DataCollection([0,1,2,3,4])
>>> dc.is_stream
False
>>> result = dc.map(lambda x: x+1)
>>> result.is_stream
False
>>> result._iterable
[1, 2, 3, 4, 5]
>>> dc = DataCollection(iter(range(5)))
>>> dc.is_stream
True
>>> result = dc.map(lambda x: x+1)
>>> result.is_stream
True
>>> isinstance(result._iterable, Iterable)
True
map(*arg) DataFrame[source]

Apply a function across all values in a DataFrame.

Parameters:

*arg (Callable) – One function to apply to the DataFrame.

Returns:

New DataFrame containing computation results.

Return type:

DataFrame

mmap(ops: list, num_worker=None, backend=None)

Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

# TODO: the test is broken with pytest # Examples:

# 1. Using mmap:

# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]

# 2. Using map instead of mmap:

# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]

# 3. DAG execution:

# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]

property mode

Storage mode of the DataFrame.

Return the storage mode of the DataFrame.

Returns:

The storage format of the Dataframe.

Return type:

ModeFlag

Examples

>>> from towhee import Entity, DataFrame
>>> e = [Entity(a=a, b=b) for a,b in zip(range(5), range(5))]
>>> df = DataFrame(e)
>>> df.mode
<ModeFlag.ROWBASEDFLAG: 1>
>>> df = df.to_column()
>>> df.mode
<ModeFlag.COLBASEDFLAG: 2>
netx()

Show dags’ relations.

Returns:

The dags’ relations

Return type:

image

notify_consumed(new_id)

Notfify that a DataCollection was consumed.

When a DataCollection is consumed by a call to another DataCollection, that Dag needs to be aware of this, so any functions that consume more than the DataCollection calling the function need to use this function, e.g. zip().

Parameters:

new_id (str) – The ID of the DataCollection that did the consuming.

parse_json()

Parse string to entities.

Examples:

>>> from towhee import DataFrame
>>> df = (
...     DataFrame(['{"x": 1}'])
...         .parse_json()
... )
>>> df[0].x
1
pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)

Set the parameters in DC.

Parameters:
  • parallel (int, optional) – Set the number of parallel executions for the following calls, defaults to None.

  • chunksize (int, optional) – Set the chunk size for arrow, defaults to None.

  • jit (Union[str, dict], optional) – Can be set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully, defaults to None.

  • format_priority (List[str], optional) – The priority list of format, defaults to None.

Returns:

Self

Return type:

DataCollection

pmap(unary_op, num_worker=None, backend=None)

Apply unary_op with parallel execution. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream()
...     .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5)
...     .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list()
... )
>>> len(stage_1_thread_set) > 1
True
>>> len(stage_2_thread_set) > 1
True
pop(*args) DataCollection

Extend a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.pop()
[1, 2]
static range(*arg, **kws) DataCollection

Generate DataCollection with range of values.

Generate DataCollection with a range of numbers as the data. Functions in same way as Python range() function.

Returns:

Returns a new DataCollection.

Return type:

DataCollection

Examples

>>> DataCollection.range(5).to_list()
[0, 1, 2, 3, 4]
ray_start(address=None, local_packages: Optional[list] = None, pip_packages: Optional[list] = None, silence=True)

Start the ray service. When using a remote cluster, all dependencies for custom functions and operators defined locally will need to be sent to the ray cluster. If using ray locally, within the runtime, avoid passing in any arguments.

Parameters:
  • address (str) – The address for the ray service being connected to. If using ray cluster remotely with kubectl forwarded port, the most likely address will be “ray://localhost:10001”.

  • local_packages (list[str]) – Whichever locally defined modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.

  • pip_packages (list[str]) – Whichever pip installed modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.

classmethod read_camera(device_id=0, limit=-1)

Read images from a camera.

Parameters:
  • device_id (int, optional) – The camera device ID. Defaults to 0.

  • limit (int, optional) – The amount of images to capture. Defaults to -1.

Returns:

Collection with images.

Return type:

DataCollection

classmethod read_video(path, format='rgb24')

Load a video as a DataCollection.

Parameters:
  • path (str) – The path of the video.

  • format (str, optional) – The color format of video. Defaults to ‘rgb24’.

Returns:

DataCollection with the video.

Return type:

DataCollection

classmethod read_zip(url, pattern, mode='r')

Load files from url/path.

Parameters:
  • zip_src (Union[str, path]) – The path leads to the image.

  • pattern (str) – The filename pattern to extract.

  • mode (str) – file open mode.

Returns:

The file handler for file in the zip file.

Return type:

(File)

register_dag(children)

Function that can be called within the function trying to be added to dag.

Parameters:

children (DataCollecton or list) – List of children DataCollection’s or singular child DataCollection.

Returns:

The resulting child DataCollections.

Return type:

DataCollection or list

remove(*args) DataCollection

Remove element from DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.remove(1)
[2, 3]
rename(column: Dict[str, str])

Rename an column in DataFrame.

Parameters:

column (Dict[str, str]) – The columns to rename and their corresponding new name.

Examples:

>>> from towhee import Entity, DataFrame
>>> entities = [Entity(a=i, b=i + 1) for i in range(3)]
>>> df = DataFrame(entities)
>>> df
[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
>>> df.rename(column={'a': 'A', 'b': 'B'})
[<Entity dict_keys(['A', 'B'])>, <Entity dict_keys(['A', 'B'])>, <Entity dict_keys(['A', 'B'])>]
replace(**kws)

Replace specific attributes with given vlues.

Examples:

>>> from towhee import Entity, DataFrame
>>> entities = [Entity(num=i) for i in range(5)]
>>> df = DataFrame(entities)
>>> [i.num for i in df]
[0, 1, 2, 3, 4]
>>> df = df.replace(num={0: 1, 1: 2, 2: 3, 3: 4, 4: 5})
>>> [i.num for i in df]
[1, 2, 3, 4, 5]
report()

Report the metric scores, and if you are using ‘confusion matrix’ metric, please use jupyter to display the matrix.

Examples:

>>> from towhee import DataCollection
>>> from towhee import Entity
>>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])])
>>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report()
    accuracy  recall
lr       1.0     1.0
rf       0.8     0.8
{'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc1.with_metrics(['confusion_matrix']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report() 
<IPython.core.display.HTML object>
{'lr': {'confusion_matrix': array([[3, 0],
       [0, 2]])}, 'rf': {'confusion_matrix': array([[2, 1],
       [0, 2]])}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])])
>>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report()
      mean_average_precision  mean_hit_ratio
test                0.622222             1.0
{'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
resolve(path, index, *arg, **kws)

Dispatch unknown operators.

Parameters:
  • path (str) – The operator name.

  • index (str) – The index of data being called on.

Returns:

The operator that corresponds to the path.

Return type:

_OperatorLazyWrapper

reverse(*args) DataCollection

Reverse a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.reverse()
[3, 2, 1]
rolling(size: int, step: int = 1, drop_head=True, drop_tail=True)

Create rolling windows from DataCollection.

Parameters:
  • size (int) – Window size.

  • drop_head (bool) – Drop head windows that are not full.

  • drop_tail (bool) – Drop trailing windows that are not full.

Returns:

DataCollection of rolling windows.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_head=False)]
[[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_tail=False)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> dc.rolling(2, 2, drop_head=False, drop_tail=False)
[[0], [0, 1], [2, 3], [4]]
>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> dc.rolling(2, 4, drop_head=False, drop_tail=False)
[[0], [0, 1], [4]]
run()

Iterate through the DataCollections data.

Stream-based DataCollections will not run if the data is not a datasink. This function is a datasink that consumes the data without any operations.

safe()

Shortcut for exception_safe.

sample(ratio=1.0) DataCollection

Sample the data collection.

Parameters:

ratio (float) – sample ratio.

Returns:

Sampled data collection.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10000))
>>> result = dc.sample(0.1)
>>> ratio = len(result.to_list()) / 10000.
>>> 0.09 < ratio < 0.11
True
property select

Select columns from a DC.

Examples:

>>> from towhee import Entity, DataFrame
>>> entities = [Entity(a=i, b=i, c=i) for i in range(3)]
>>> dc = DataFrame(entities)
>>> dc.select('a')
[<Entity dict_keys(['a'])>, <Entity dict_keys(['a'])>, <Entity dict_keys(['a'])>]
select_from(other)

Select data from dc with list(self).

Parameters:

other (DataCollection) – DataCollection to select from.

Examples

>>> from towhee import DataCollection
>>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2])
>>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1)
>>> list(dc3)
[[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
serve(path='/', app=None)

Serve the DataFrame as a RESTful API

Parameters:
  • path (str, optional) – API path. Defaults to ‘/’.

  • app (_type_, optional) – The FastAPI app the API bind to, will create one if None.

Returns:

the app that bind to

Return type:

_type_

Examples:

>>> from fastapi import FastAPI
>>> from fastapi.testclient import TestClient
>>> app = FastAPI()
>>> import towhee
>>> with towhee.api() as api:
...     app1 = (
...         api.map(lambda x: x+' -> 1')
...            .map(lambda x: x+' => 1')
...            .serve('/app1', app)
...     )
>>> with towhee.api['x']() as api:
...     app2 = (
...         api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2')
...            .select['y']()
...            .serve('/app2', app)
...     )
>>> with towhee.api() as api:
...     app2 = (
...         api.parse_json()
...            .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3')
...            .select['y']()
...            .serve('/app3', app)
...     )
>>> client = TestClient(app)
>>> client.post('/app1', '1').text
'"1 -> 1 => 1"'
>>> client.post('/app2', '2').text
'{"y":"2 -> 2 => 2"}'
>>> client.post('/app3', '{"x": "3"}').text
'{"y":"3 -> 3 => 3"}'
set_chunksize(chunksize)

Set chunk size for arrow

Parameters:

chuksize (int) – How many rows per chunk.

Returns:

New DataCollection converted to Table.

Return type:

DataCollection

Examples

>>> import towhee
>>> dc_1 = towhee.dc['a'](range(20))
>>> dc_1 = dc_1.set_chunksize(10)
>>> dc_2 = dc_1.runas_op['a', 'b'](func=lambda x: x+1)
>>> dc_1.get_chunksize(), dc_2.get_chunksize()
(10, 10)
>>> dc_2._iterable.chunks()
[pyarrow.Table
a: int64
b: int64
----
a: [[0,1,2,3,4,5,6,7,8,9]]
b: [[1,2,3,4,5,6,7,8,9,10]], pyarrow.Table
a: int64
b: int64
----
a: [[10,11,12,13,14,15,16,17,18,19]]
b: [[11,12,13,14,15,16,17,18,19,20]]]
>>> dc_3 = towhee.dc['a'](range(20)).stream()
>>> dc_3 = dc_3.set_chunksize(10)
>>> dc_4 = dc_3.runas_op['a', 'b'](func=lambda x: x+1)
>>> dc_4._iterable.chunks()
[pyarrow.Table
a: int64
b: int64
----
a: [[0,1,2,3,4,5,6,7,8,9]]
b: [[1,2,3,4,5,6,7,8,9,10]], pyarrow.Table
a: int64
b: int64
----
a: [[10,11,12,13,14,15,16,17,18,19]]
b: [[11,12,13,14,15,16,17,18,19,20]]]
set_evaluating(state=None)

Set evaluating mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

set_format_priority(format_priority: List[str])

Set format priority.

Parameters:

format_priority (List[str]) – The priority queue of format.

Returns:

DataCollection with format_priorty set.

Return type:

DataCollection

set_parallel(num_worker=2, backend='thread')

Set parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = set()
>>> stage_2_thread_set = set()
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_2_thread_set)>1
True
set_state(state)

Set the state storage for DataCollection

Parameters:

state (State) – state storage

Returns:

data collection itself

Return type:

DataCollection

set_training(state=None)

Set training mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

show(limit=5, header=None, tablefmt='html', formatter={})

Print the first n lines of a DataCollection.

Parameters:
  • limit (int, optional) – The number of lines to print. Prints all if limit is negative. Defaults to 5.

  • header (_type_, optional) – The field names. Defaults to None.

  • tablefmt (str, optional) – The format of the output, supports html, plain, grid.. Defaults to ‘html’.

shuffle() DataCollection

Shuffle an unstreamed data collection in place.

Returns:

Shuffled data collection.

Return type:

DataCollection

Examples

1. Shuffle: >>> from towhee import DataCollection >>> dc = DataCollection([0, 1, 2, 3, 4]) >>> a = dc.shuffle() >>> tuple(a) == tuple(range(5)) False

2. Streamed data collection is not supported: >>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> _ = dc.shuffle() Traceback (most recent call last): TypeError: shuffle is not supported for streamed data collection.

sort(*args) DataCollection

Sort a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 4, 3])
>>> dc.sort()
[1, 3, 4]
split(count)

Split a dataframe into multiple dataframes.

Parameters:

count (int) – how many resulting DCs;

Returns:

copies of DC;

Return type:

[DataCollection, …]

Examples:

  1. Split:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> a, b, c = dc.split(3)
>>> a.zip(b, c).to_list()
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
split_train_test(size: list = [0.9, 0.1], **kws)

Split DataCollection to train and test data.

Parameters:

size (list) – The size of the train and test.

Examples:

>>> from towhee.functional import DataCollection
>>> dc = DataCollection.range(10)
>>> train, test = dc.split_train_test(shuffle=False)
>>> train.to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> test.to_list()
[9]
stream()

Create a stream data collection.

Examples: 1. Convert a data collection to streamed version

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> dc.is_stream
False
>>> dc = dc.stream()
>>> dc.is_stream
True
to_column()

Convert the DataCollection to column-based table DataCollection.

Returns:

The current DC converted to Table DC.

Return type:

DataCollection

Examples

>>> from towhee import Entity, DataFrame
>>> e = [Entity(a=a, b=b) for a,b in zip(['abc', 'def', 'ghi'], [1,2,3])]
>>> df = DataFrame(e)
>>> df
[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
>>> df.to_column()
pyarrow.Table
a: string
b: int64
----
a: [["abc","def","ghi"]]
b: [[1,2,3]]
to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')

Save dc as a csv file.

Parameters:
  • csv_path (Union[str, Path]) – The path to save the dc to.

  • encoding (str) – The encoding to use in the output file.

to_dc() DataCollection[source]

Turn a DataFrame into a DataCollection.

Returns:

Resulting DataCollection from DataFrame

Return type:

DataCollection

Examples

>>> from towhee import DataFrame, Entity
>>> e = [Entity(a=a, b=b) for a,b in zip(['abc', 'def', 'ghi'], [1,2,3])]
>>> df = DataFrame(e)
>>> type(df)
<class 'towhee.functional.data_collection.DataFrame'>
>>> type(df.to_dc())
<class 'towhee.functional.data_collection.DataCollection'>
to_df() DataFrame

Turn a DataCollection into a DataFrame.

Returns:

Resulting converted DataFrame.

Return type:

DataFrame

Examples

>>> from towhee import DataCollection, Entity
>>> e = [Entity(a=a, b=b) for a,b in zip(['abc', 'def', 'ghi'], [1,2,3])]
>>> dc = DataCollection(e)
>>> type(dc)
<class 'towhee.functional.data_collection.DataCollection'>
>>> type(dc.to_df())
<class 'towhee.functional.data_collection.DataFrame'>
to_list() list

Convert DataCollection to list.

Returns:

List of values stored in DataCollection.

Return type:

list

Examples

>>> DataCollection.range(5).to_list()
[0, 1, 2, 3, 4]
to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)

Encode a video; with audio if provided.

Parameters:
  • output_path (str) – Path to output the video to.

  • codec (str, optional) – Which codec to use for encoding. Defaults to None.

  • rate (int, optional) – The framrate of the video. Defaults to None.

  • width (int, optional) – The width of the video image. Defaults to None.

  • height (int, optional) – The height of the video image. Defaults to None.

  • format (str, optional) – The color format of the video. Defaults to None.

  • template (str, optional) – The template video stream of the ouput video stream. Defaults to None.

  • audio_src (str, optional) – Audio path to include in video. Defaults to None.

unstream()

Create a unstream data collection.

Examples:

  1. Create a unstream data collection

>>> from towhee import DataCollection
>>> dc = DataCollection(iter(range(5))).unstream()
>>> dc.is_stream
False
  1. Convert a streamed data collection to unstream version

>>> dc = DataCollection(iter(range(5)))
>>> dc.is_stream
True
>>> dc = dc.unstream()
>>> dc.is_stream
False
zip(*others) DataCollection

Combine multiple data collections.

Parameters:

*others (DataCollection) – The other data collections.

Returns:

Data collection with zipped values.

Return type:

DataCollection

Examples

>>> from towhee import DataCollection
>>> dc1 = DataCollection([1,2,3,4])
>>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1)
>>> dc3 = dc1.zip(dc2)
>>> list(dc3)
[(1, 2), (2, 3), (3, 4), (4, 5)]