towhee.functional.mixins.DCMixins

class towhee.functional.mixins.DCMixins[source]

Bases: DatasetMixin, DispatcherMixin, DisplayMixin, ParallelMixin, ComputerVisionMixin, StateMixin, MetricMixin, RayMixin, ServeMixin, MilvusMixin, DagMixin, FaissMixin, ConfigMixin, CompileMixin, RemoteMixin, ListMixin, DataProcessingMixin, SafeMixin, StreamMixin, FormatPriorityMixin

Methods

api

append

Append item to data collection.

as_function

Make the DataFrame as callable function

as_str

batch

Create small batches from data collections.

clear

Clear a DataCollection.

compile_dag

config

Set the parameters in DC.

copy

Copy a DataCollection.

count

Count an element in DataCollection.

drop_empty

Unbox Option values and drop Empty.

exception_safe

Making the data collection exception-safe by warp elements with Option.

extend

Extend a DataCollection.

fill_empty

Unbox Option values and fill Empty with default values.

filter_data

flatten

Flatten nested data collections.

from_glob

generate a file list with pattern

get_backend

get_config

Return the config in DC, such as parallel, chunksize, jit and format_priority.

get_control_plane

get_executor

get_formate_priority

get_num_worker

get_pipeline_config

Return the config in DC, such as parallel, chunksize, jit and format_priority.

get_state

Get the state storage for DataCollection

head

Get the first n lines of a DataCollection.

image_imshow

insert

Insert data into a DataCollection.

jit_resolve

mmap

Apply multiple unary_op to data collection.

netx

notify_consumed

pipeline_config

Set the parameters in DC.

pmap

Apply unary_op with parallel execution.

pop

Extend a DataCollection.

random_sample

ray_resolve

ray_start

Start the ray service.

read_audio

read_camera

read images from a camera.

read_csv

read_json

read_video

Load video as a datacollection.

read_zip

load files from url/path.

register_dag

remote

remove

Remove element from DataCollection.

report

report the metric scores

resolve

reverse

Reverse a DataCollection.

rolling

Create rolling windows from data collections.

safe

Shortcut for exception_safe

sample

Sample the data collection.

select_from

Select data from dc with list(self).

serve

Serve the DataFrame as a RESTful API

set_evaluating

Set evaluating mode for stateful operators

set_format_priority

Set format priority.

set_jit

set_parallel

Set parallel execution for following calls.

set_state

Set the state storage for DataCollection

set_training

Set training mode for stateful operators

show

Print the first n lines of a DataCollection.

shuffle

Shuffle an unstreamed data collection in place.

smap

sort

Sort a DataCollection.

split

Split a dataframe into multiple dataframes.

split_train_test

Split DataCollection to train and test data.

stream

Create a stream data collection.

to_csv

Save dc as a csv file.

to_video

Encode a video with audio if provided.

unstream

Create a unstream data collection.

with_metrics

zip

Combine two data collections.

Attributes

is_stream

Check whether the data collection is stream or unstream.

__init__() None[source]
append(*args) DataCollection

Append item to data collection.

Parameters:

item (Any) – the item to append

Returns:

self

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2])
>>> dc.append(3).append(4)
[0, 1, 2, 3, 4]
as_function()

Make the DataFrame as callable function

Returns:

a callable function

Return type:

_type_

Examples:

>>> import towhee
>>> with towhee.api() as api:
...     func1 = (
...         api.map(lambda x: x+' -> 1')
...            .map(lambda x: x+' => 1')
...            .as_function()
...     )
>>> with towhee.api['x']() as api:
...     func2 = (
...         api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2')
...            .select['y']()
...            .as_raw()
...            .as_function()
...     )
>>> with towhee.api() as api:
...     func3 = (
...         api.parse_json()
...            .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3')
...            .select['y']()
...            .as_json()
...            .as_function()
...     )
>>> func1('1')
'1 -> 1 => 1'
>>> func2('2')
'2 -> 2 => 2'
>>> func3('{"x": "3"}')
'{"y": "3 -> 3 => 3"}'
batch(size, drop_tail=False, raw=True)

Create small batches from data collections.

Parameters:
  • size (int) – Window size;

  • drop_tail (bool) – Drop tailing windows that not full, defaults to False;

  • raw (bool) – Whether to return raw data instead of DataCollection, defaults to True

Returns:

DataCollection of batched windows or batch raw data

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10))
>>> [list(batch) for batch in dc.batch(2, raw=False)]
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3, drop_tail=True)
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity
>>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])])
>>> dc.batch(2)
[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
clear(*args) DataCollection

Clear a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.clear()
[]
config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)

Set the parameters in DC.

Parameters:
  • parallel (int) – Set the number of parallel execution for following calls.

  • chunksize (int) – Set the chunk size for arrow.

  • jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.

  • format_priority (List[str]) – The priority list of format.

copy(*args) DataCollection

Copy a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc_1 = dc.copy()
>>> dc_1._iterable.append(4)
>>> dc, dc_1
([1, 2, 3], [1, 2, 3, 4])
count(*args) int

Count an element in DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.count(1)
1
drop_empty(callback: Callable = None) DataCollection

Unbox Option values and drop Empty.

Parameters:

callback (Callable) – handler for empty values;

Returns:

data collection that drops empty values;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection.range(5)
>>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list()
[0.0, 0.5, 1.0, 2.0]

Get inputs that case exceptions:

>>> exception_inputs = []
>>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value))
>>> exception_inputs
[3]
exception_safe()

Making the data collection exception-safe by warp elements with Option.

Examples:

  1. Exception breaks pipeline execution:

>>> from towhee import DataCollection
>>> dc = DataCollection.range(5)
>>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list()
Traceback (most recent call last):
ZeroDivisionError: division by zero
  1. Exception-safe execution

>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list()
[Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list()
[Some(0.0), Some(0.5), Some(1.0), Empty()]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list()
[Some(0.0), Some(0.5), Some(1.0)]
extend(*args) DataCollection

Extend a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.extend([4, 5])
[1, 2, 3, 4, 5]
fill_empty(default: Any = None) DataCollection

Unbox Option values and fill Empty with default values.

Parameters:

default (Any) – default value to replace empty values;

Returns:

data collection with empty values filled with default;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection.range(5)
>>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list()
[0.0, 0.5, 1.0, -1.0, 2.0]
flatten() DataCollection

Flatten nested data collections.

Returns:

flattened data collection;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10))
>>> nested_dc = dc.batch(2)
>>> nested_dc.flatten().to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
classmethod from_glob(*args)

generate a file list with pattern

get_config()

Return the config in DC, such as parallel, chunksize, jit and format_priority.

get_pipeline_config()

Return the config in DC, such as parallel, chunksize, jit and format_priority.

get_state()

Get the state storage for DataCollection

Returns:

the state storage

Return type:

State

head(n: int = 5)

Get the first n lines of a DataCollection.

Parameters:

n (int) – The number of lines to print. Default value is 5.

Examples:

>>> from towhee import DataCollection
>>> DataCollection.range(10).head(3).to_list()
[0, 1, 2]
insert(*args) DataCollection

Insert data into a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.insert(0, 0)
[0, 1, 2, 3]
property is_stream

Check whether the data collection is stream or unstream.

Examples:

>>> from towhee import DataCollection
>>> from typing import Iterable
>>> dc = DataCollection([0,1,2,3,4])
>>> dc.is_stream
False
>>> result = dc.map(lambda x: x+1)
>>> result.is_stream
False
>>> result._iterable
[1, 2, 3, 4, 5]
>>> dc = DataCollection(iter(range(5)))
>>> dc.is_stream
True
>>> result = dc.map(lambda x: x+1)
>>> result.is_stream
True
>>> isinstance(result._iterable, Iterable)
True
mmap(ops: list, num_worker=None, backend=None)

Apply multiple unary_op to data collection. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

# TODO: the test is broken with pytest # Examples:

# 1. Using mmap:

# >>> from towhee import DataCollection # >>> dc1 = DataCollection([0,1,2,’3’,4]).stream() # >>> a1, b1 = dc1.mmap([lambda x: x+1, lambda x: x*2]) # >>> c1 = a1.map(lambda x: x+1) # >>> c1.zip(b1).to_list() # [(2, 0), (3, 2), (4, 4), (Empty(), ‘33’), (6, 8)]

# 2. Using map instead of mmap:

# >>> from towhee import DataCollection # >>> dc2 = DataCollection.range(5).stream() # >>> a2, b2, c2 = dc2.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d2 = a2.map(lambda x: x+1) # >>> d2.zip(b2, c2).to_list() # [(2, 0, 0), (3, 2, 0), (4, 4, 1), (5, 6, 1), (6, 8, 2)]

# 3. DAG execution:

# >>> dc3 = DataCollection.range(5).stream() # >>> a3, b3, c3 = dc3.map(lambda x: x+1, lambda x: x*2, lambda x: int(x/2)) # >>> d3 = a3.map(lambda x: x+1) # >>> d3.zip(b3, c3).map(lambda x: x[0]+x[1]+x[2]).to_list() # [2, 5, 9, 12, 16]

pipeline_config(parallel: Optional[int] = None, chunksize: Optional[int] = None, jit: Optional[Union[str, dict]] = None, format_priority: Optional[List[str]] = None)

Set the parameters in DC.

Parameters:
  • parallel (int) – Set the number of parallel execution for following calls.

  • chunksize (int) – Set the chunk size for arrow.

  • jit (Union[str, dict]) – It can set to “numba”, this mode will speed up the Operator’s function, but it may also need to return to python mode due to JIT failure, which will take longer, so please set it carefully.

  • format_priority (List[str]) – The priority list of format.

pmap(unary_op, num_worker=None, backend=None)

Apply unary_op with parallel execution. Currently supports two backends, ray and thread.

Parameters:
  • unary_op (func) – the op to be mapped;

  • num_worker (int) – how many threads to reserve for this op;

  • backend (str) – whether to use ray or thread

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = {threading.current_thread().ident}
>>> stage_2_thread_set = {threading.current_thread().ident}
>>> result = (
...     DataCollection.range(1000).stream()
...     .pmap(lambda x: stage_1_thread_set.add(threading.current_thread().ident), 5)
...     .pmap(lambda x: stage_2_thread_set.add(threading.current_thread().ident), 4).to_list()
... )
>>> len(stage_1_thread_set) > 1
True
>>> len(stage_2_thread_set) > 1
True
pop(*args) DataCollection

Extend a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.pop()
[1, 2]
ray_start(address=None, local_packages: Optional[list] = None, pip_packages: Optional[list] = None, silence=True)

Start the ray service. When using a remote cluster, all dependencies for custom functions and operators defined locally will need to be sent to the ray cluster. If using ray locally, within the runtime, avoid passing in any arguments.

Parameters:
  • address (str) – The address for the ray service being connected to. If using ray cluster remotely with kubectl forwarded port, the most likely address will be “ray://localhost:10001”.

  • local_packages (list[str]) – Whichever locally defined modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.

  • pip_packages (list[str]) – Whichever pip installed modules that are used within a custom function supplied to the pipeline, whether it be in lambda functions, locally registered operators, or functions themselves.

classmethod read_camera(device_id=0, limit=-1)

read images from a camera.

classmethod read_video(path, format='rgb24')

Load video as a datacollection.

Parameters:
  • path – The path to the target video.

  • format – The format of the images loaded from video.

classmethod read_zip(url, pattern, mode='r')

load files from url/path.

Parameters:
  • zip_src (Union[str, path]) – The path leads to the image.

  • pattern (str) – The filename pattern to extract.

  • mode (str) – file open mode.

Returns:

The file handler for file in the zip file.

Return type:

(File)

remove(*args) DataCollection

Remove element from DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.remove(1)
[2, 3]
report()

report the metric scores

Examples:

>>> from towhee import DataCollection
>>> from towhee import Entity
>>> dc1 = DataCollection([Entity(a=a, b=b, c=c) for a, b, c in zip([0,1,1,0,0], [0,1,1,1,0], [0,1,1,0,0])])
>>> dc1.with_metrics(['accuracy', 'recall']).evaluate['a', 'c'](name='lr').evaluate['a', 'b'](name='rf').report()
    accuracy  recall
lr       1.0     1.0
rf       0.8     0.8
{'lr': {'accuracy': 1.0, 'recall': 1.0}, 'rf': {'accuracy': 0.8, 'recall': 0.8}}
>>> dc2 = DataCollection([Entity(pred=[1,6,2,7,8,3,9,10,4,5], act=[1,2,3,4,5])])
>>> dc2.with_metrics(['mean_average_precision', 'mean_hit_ratio']).evaluate['act', 'pred'](name='test').report()
      mean_average_precision  mean_hit_ratio
test                0.622222             1.0
{'test': {'mean_average_precision': 0.6222222222222221, 'mean_hit_ratio': 1.0}}
reverse(*args) DataCollection

Reverse a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 2, 3])
>>> dc.reverse()
[3, 2, 1]
rolling(size: int, drop_head=True, drop_tail=True)

Create rolling windows from data collections.

Parameters:
  • size (int) – Wndow size.

  • drop_head (bool) – Drop headding windows that not full.

  • drop_tail (bool) – Drop tailing windows that not full.

Returns:

data collection of rolling windows;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_head=False)]
[[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_tail=False)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
safe()

Shortcut for exception_safe

sample(ratio=1.0) DataCollection

Sample the data collection.

Parameters:

ratio (float) – sample ratio;

Returns:

sampled data collection;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection(range(10000))
>>> result = dc.sample(0.1)
>>> ratio = len(result.to_list()) / 10000.
>>> 0.09 < ratio < 0.11
True
select_from(other)

Select data from dc with list(self).

Examples:

>>> from towhee import DataCollection
>>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2])
>>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1)
>>> list(dc3)
[[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
serve(path='/', app=None)

Serve the DataFrame as a RESTful API

Parameters:
  • path (str, optional) – API path. Defaults to ‘/’.

  • app (_type_, optional) – The FastAPI app the API bind to, will create one if None.

Returns:

the app that bind to

Return type:

_type_

Examples:

>>> from fastapi import FastAPI
>>> from fastapi.testclient import TestClient
>>> app = FastAPI()
>>> import towhee
>>> with towhee.api() as api:
...     app1 = (
...         api.map(lambda x: x+' -> 1')
...            .map(lambda x: x+' => 1')
...            .serve('/app1', app)
...     )
>>> with towhee.api['x']() as api:
...     app2 = (
...         api.runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 2')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 2')
...            .select['y']()
...            .serve('/app2', app)
...     )
>>> with towhee.api() as api:
...     app2 = (
...         api.parse_json()
...            .runas_op['x', 'x_plus_1'](func=lambda x: x+' -> 3')
...            .runas_op['x_plus_1', 'y'](func=lambda x: x+' => 3')
...            .select['y']()
...            .serve('/app3', app)
...     )
>>> client = TestClient(app)
>>> client.post('/app1', '1').text
'"1 -> 1 => 1"'
>>> client.post('/app2', '2').text
'{"y":"2 -> 2 => 2"}'
>>> client.post('/app3', '{"x": "3"}').text
'{"y":"3 -> 3 => 3"}'
set_evaluating(state=None)

Set evaluating mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

set_format_priority(format_priority: List[str])

Set format priority.

Parameters:

format_priority (List[str]) – The priority queue of format.

set_parallel(num_worker=2, backend='thread')

Set parallel execution for following calls.

Examples:

>>> from towhee import DataCollection
>>> import threading
>>> stage_1_thread_set = set()
>>> stage_2_thread_set = set()
>>> result = (
...     DataCollection.range(1000).stream().set_parallel(4)
...     .map(lambda x: stage_1_thread_set.add(threading.current_thread().ident))
...     .map(lambda x: stage_2_thread_set.add(threading.current_thread().ident)).to_list()
... )
>>> len(stage_2_thread_set)>1
True
set_state(state)

Set the state storage for DataCollection

Parameters:

state (State) – state storage

Returns:

data collection itself

Return type:

DataCollection

set_training(state=None)

Set training mode for stateful operators

Parameters:

state (State, optional) – Update the state storage. Defaults to None.

Returns:

data collection itself

Return type:

DataCollection

show(limit=5, header=None, tablefmt='html', formatter={})

Print the first n lines of a DataCollection.

Parameters:
  • limit (int) – The number of lines to print. Default value is 5. Print all if limit is non-positive.

  • header (list of str) – The field names.

  • tablefmt (str) – The format of the output, support html, plain, grid.

shuffle() DataCollection

Shuffle an unstreamed data collection in place.

Returns:

shuffled data collection;

Return type:

DataCollection

Examples:

  1. Shuffle:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> a = dc.shuffle()
>>> tuple(a) == tuple(range(5))
False
  1. streamed data collection is not supported:

>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> _ = dc.shuffle()
Traceback (most recent call last):
TypeError: shuffle is not supported for streamed data collection.
sort(*args) DataCollection

Sort a DataCollection.

Examples:

>>> from towhee import DataCollection
>>> dc = DataCollection([1, 4, 3])
>>> dc.sort()
[1, 3, 4]
split(count)

Split a dataframe into multiple dataframes.

Parameters:

count (int) – how many resulting DCs;

Returns:

copies of DC;

Return type:

[DataCollection, …]

Examples:

  1. Split:

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> a, b, c = dc.split(3)
>>> a.zip(b, c).to_list()
[(0, 0, 0), (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4)]
split_train_test(size: list = [0.9, 0.1], **kws)

Split DataCollection to train and test data.

Parameters:

size (list) – The size of the train and test.

Examples:

>>> from towhee.functional import DataCollection
>>> dc = DataCollection.range(10)
>>> train, test = dc.split_train_test(shuffle=False)
>>> train.to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8]
>>> test.to_list()
[9]
stream()

Create a stream data collection.

Examples: 1. Convert a data collection to streamed version

>>> from towhee import DataCollection
>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> dc.is_stream
False
>>> dc = dc.stream()
>>> dc.is_stream
True
to_csv(csv_path: Union[str, Path], encoding: str = 'utf-8-sig')

Save dc as a csv file.

Parameters:
  • csv_path (Union[str, Path]) – The path to save the dc to.

  • encoding (str) – The encoding to use in the output file.

to_video(output_path, codec=None, rate=None, width=None, height=None, format=None, template=None, audio_src=None)

Encode a video with audio if provided.

Parameters:
  • output_path – The path of the output video.

  • codec – The codec to encode and decode the video.

  • rate – The rate of the video.

  • width – The width of the video.

  • height – The height of the video.

  • format – The format of the video frame image.

  • template – The template video stream of the ouput video stream.

  • audio_src – The audio to encode with the video.

unstream()

Create a unstream data collection.

Examples:

  1. Create a unstream data collection

>>> from towhee import DataCollection
>>> dc = DataCollection(iter(range(5))).unstream()
>>> dc.is_stream
False
  1. Convert a streamed data collection to unstream version

>>> dc = DataCollection(iter(range(5)))
>>> dc.is_stream
True
>>> dc = dc.unstream()
>>> dc.is_stream
False
zip(*others) DataCollection

Combine two data collections.

Parameters:

*others (DataCollection) – other data collections;

Returns:

data collection with zipped values;

Return type:

DataCollection

Examples:

>>> from towhee import DataCollection
>>> dc1 = DataCollection([1,2,3,4])
>>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1)
>>> dc3 = dc1.zip(dc2)
>>> list(dc3)
[(1, 2), (2, 3), (3, 4), (4, 5)]