DataCollection

class towhee.DataCollection(iterable: Iterable)[source]

Bases: Iterable, towhee.functional.mixins.AllMixins

DataCollection is a pythonic computation and processing framework for unstructured data in machine learning and data science. It allows a data scientist or researcher to assemble a data processing pipeline, do his model work (embedding, transforming, or classification) and apply it to the business (search, recommendation, or shopping) with a method-chaining style API.

Examples:

  1. Create a data collection from list or iterator:

>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> dc = DataCollection(iter([0, 1, 2, 3, 4]))
  1. Chaining function invocations makes your code clean and fluent:

>>> (
...    dc.map(lambda x: x+1)
...      .map(lambda x: x*2)
... ).to_list()
[2, 4, 6, 8, 10]
  1. Multi-line closures are also supported via decorator syntax

>>> dc = DataCollection([1,2,3,4])
>>> @dc.map
... def add1(x):
...     return x+1
>>> @add1.map
... def mul2(x):
...     return x *2
>>> mul2.to_list()
[4, 6, 8, 10]
>>> dc = DataCollection([1,2,3,4])
>>> @dc.filter
... def ge3(x):
...     return x>=3
>>> ge3.to_list()
[3, 4]

DataCollection is designed to behave as a python list or iterator. Consider you are running the following code:

1dc.map(stage1)
2  .map(stage2)

1. iterator and stream mode: When a DataCollection object is created from an iterator, it behaves as a python iterator and performs stream-wise data processing:

  1. DataCollection takes one element from the input and applies stage1 and stage2 sequentially ;

  2. Since DataCollection holds no data, indexing or shuffle is not supported;

2. list and unstream mode: If a DataCollection object is created from a list, it will hold all the input values, and perform stage-wise computations:

  1. stage2 will wait until all the calculations are done in stage1;

  2. A new DataCollection will be created to hold all the outputs for each stage. You can perform list operations on result DataCollection;

__init__(iterable: Iterable) None[source]

Initializes a new DataCollection instance.

Parameters

iterable (Iterable) – input data

stream()[source]

Create a stream data collection.

Examples: 1. Convert a data collection to streamed version

>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> dc.is_stream
False
>>> dc = dc.stream()
>>> dc.is_stream
True
unstream()[source]

Create a unstream data collection.

Examples:

  1. Create a unstream data collection

>>> dc = DataCollection(iter(range(5))).unstream()
>>> dc.is_stream
False
  1. Convert a streamed data collection to unstream version

>>> dc = DataCollection(iter(range(5)))
>>> dc.is_stream
True
>>> dc = dc.unstream()
>>> dc.is_stream
False
property is_stream

Check whether the data collection is stream or unstream.

Examples:

>>> dc = DataCollection([0,1,2,3,4])
>>> dc.is_stream
False
>>> result = dc.map(lambda x: x+1)
>>> result.is_stream
False
>>> result._iterable
[1, 2, 3, 4, 5]
>>> dc = DataCollection(iter(range(5)))
>>> dc.is_stream
True
>>> result = dc.map(lambda x: x+1)
>>> result.is_stream
True
>>> isinstance(result._iterable, Iterable)
True
exception_safe()[source]

Making the data collection exception-safe by warp elements with Option.

Examples:

  1. Exception breaks pipeline execution:

>>> dc = DataCollection.range(5)
>>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list()
Traceback (most recent call last):
ZeroDivisionError: division by zero
  1. Exception-safe execution

>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list()
[Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list()
[Some(0.0), Some(0.5), Some(1.0), Empty()]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list()
[Some(0.0), Some(0.5), Some(1.0)]
safe()[source]

Shortcut for exception_safe

select_from(other)[source]

Select data from dc with list(self).

Examples:

>>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2])
>>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1)
>>> list(dc3)
[[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
fill_empty(default: Optional[Any] = None) towhee.functional.data_collection.DataCollection[source]

Unbox Option values and fill Empty with default values.

Parameters

default (Any) – default value to replace empty values;

Returns

data collection with empty values filled with default;

Return type

DataCollection

Examples:

>>> dc = DataCollection.range(5)
>>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list()
[0.0, 0.5, 1.0, -1.0, 2.0]
drop_empty(callback: Optional[Callable] = None) towhee.functional.data_collection.DataCollection[source]

Unbox Option values and drop Empty.

Parameters

callback (Callable) – handler for empty values;

Returns

data collection that drops empty values;

Return type

DataCollection

Examples:

>>> dc = DataCollection.range(5)
>>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list()
[0.0, 0.5, 1.0, 2.0]

Get inputs that case exceptions:

>>> exception_inputs = []
>>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value))
>>> exception_inputs
[3]
map(*arg)[source]

Apply operator to data collection.

Parameters

*arg (Callable) – functions/operators to apply to data collection;

Returns

data collections that contains computation results;

Return type

DataCollection

Examples:

>>> dc = DataCollection([1,2,3,4])
>>> dc.map(lambda x: x+1).map(lambda x: x*2).to_list()
[4, 6, 8, 10]
zip(*others) towhee.functional.data_collection.DataCollection[source]

Combine two data collections.

Parameters

*others (DataCollection) – other data collections;

Returns

data collection with zipped values;

Return type

DataCollection

Examples:

>>> dc1 = DataCollection([1,2,3,4])
>>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1)
>>> dc3 = dc1.zip(dc2)
>>> list(dc3)
[(1, 2), (2, 3), (3, 4), (4, 5)]
filter(unary_op: Callable, drop_empty=False) towhee.functional.data_collection.DataCollection[source]

Filter data collection with unary_op.

Parameters
  • unary_op (Callable) – Callable to decide whether to filter the element;

  • drop_empty (bool) – Drop empty values. Defaults to False.

Returns

filtered data collection

Return type

DataCollection

sample(ratio=1.0) towhee.functional.data_collection.DataCollection[source]

Sample the data collection.

Parameters

ratio (float) – sample ratio;

Returns

sampled data collection;

Return type

DataCollection

Examples:

>>> dc = DataCollection(range(10000))
>>> result = dc.sample(0.1)
>>> ratio = len(result.to_list()) / 10000.
>>> 0.09 < ratio < 0.11
True
static range(*arg, **kws)[source]

Generate data collection with ranged numbers.

Examples:

>>> DataCollection.range(5).to_list()
[0, 1, 2, 3, 4]
batch(size, drop_tail=False, raw=True)[source]

Create small batches from data collections.

Parameters
  • size (int) – window size;

  • drop_tail (bool) – drop tailing windows that not full, defaults to False;

  • raw (bool) – whether to return raw data instead of DataCollection, defaults to True

Returns

DataCollection of batched windows or batch raw data

Examples:

>>> dc = DataCollection(range(10))
>>> [list(batch) for batch in dc.batch(2, raw=False)]
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10))
>>> dc.batch(3, drop_tail=True)
[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity
>>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])])
>>> dc.batch(2)
[<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
rolling(size: int, drop_head=True, drop_tail=True)[source]

Create rolling windows from data collections.

Parameters
  • size (int) – window size;

  • drop_head (bool) – drop headding windows that not full;

  • drop_tail (bool) – drop tailing windows that not full;

Returns

data collection of rolling windows;

Return type

DataCollection

Examples:

>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_head=False)]
[[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5))
>>> [list(batch) for batch in dc.rolling(3, drop_tail=False)]
[[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
flatten() towhee.functional.data_collection.DataCollection[source]

Flatten nested data collections.

Returns

flattened data collection;

Return type

DataCollection

Examples:

>>> dc = DataCollection(range(10))
>>> nested_dc = dc.batch(2)
>>> nested_dc.flatten().to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
shuffle() towhee.functional.data_collection.DataCollection[source]

Shuffle an unstreamed data collection in place.

Returns

shuffled data collection;

Return type

DataCollection

Examples:

  1. Shuffle:

>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> a = dc.shuffle()
>>> tuple(a) == tuple(range(5))
False
  1. streamed data collection is not supported:

>>> dc = DataCollection([0, 1, 2, 3, 4]).stream()
>>> _ = dc.shuffle()
Traceback (most recent call last):
TypeError: shuffle is not supported for streamed data collection.
getattr(self, name)[source]

Unknown method dispatcher.

When a unknown method is invoked on a DataCollection object, the function call will be dispatched to a method resolver. By registering function to the resolver, you are able to extend DataCollection’s API at runtime without modifying its code.

Examples:

  1. Define two operators:

>>> class my_add:
...     def __init__(self, val):
...         self.val = val
...     def __call__(self, x):
...         return x+self.val
>>> class my_mul:
...     def __init__(self, val):
...         self.val = val
...     def __call__(self, x):
...         return x*self.val
  1. Register the operators to DataCollection’s execution context with param_scope:

>>> from towhee import param_scope
>>> with param_scope(dispatcher={
...         'add': my_add, # register `my_add` as `dc.add`
...         'mul': my_mul  # register `my_mul` as `dc.mul`
... }):
...     dc = DataCollection([1,2,3,4])
...     dc.add(1).mul(2).to_list() # call registered operator
[4, 6, 8, 10]
self[index][source]

Indexing for data collection.

Examples:

>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> dc[0]
0
>>> dc.stream()[1]
Traceback (most recent call last):
TypeError: indexing is only supported for data collection created from list or pandas DataFrame.
self[index] = value[source]

Indexing for data collection.

Examples:

>>> dc = DataCollection([0, 1, 2, 3, 4])
>>> dc[0]
0
>>> dc[0] = 5
>>> dc._iterable[0]
5
>>> dc.stream()[0]
Traceback (most recent call last):
TypeError: indexing is only supported for data collection created from list or pandas DataFrame.
append(item: Any) towhee.functional.data_collection.DataCollection[source]

Append item to data collection

Parameters

item (Any) – the item to append

Returns

self

Return type

DataCollection

Examples:

>>> dc = DataCollection([0, 1, 2])
>>> dc.append(3).append(4)
[0, 1, 2, 3, 4]
self >> unary_op[source]

Chain the operators with >>.

Examples:

>>> dc = DataCollection([1,2,3,4])
>>> (dc
...     >> (lambda x: x+1)
...     >> (lambda x: x*2)
... ).to_list()
[4, 6, 8, 10]
self + other[source]

Concat two data collections:

Examples:

>>> (DataCollection.range(5) + DataCollection.range(5)).to_list()
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
>>> (DataCollection.range(5) + DataCollection.range(5) + DataCollection.range(5)).to_list()
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
repr(self) str[source]

Return a string representation for DataCollection.

Examples:

>>> DataCollection([1, 2, 3]).unstream()
[1, 2, 3]
>>> DataCollection([1, 2, 3]).stream() 
<list_iterator object at...>
head(n: int = 5)[source]

Get the first n lines of a DataCollection.

Parameters

n (int) – The number of lines to print. Default value is 5.

Examples:

>>> DataCollection.range(10).head(3).to_list()
[0, 1, 2]
__weakref__

list of weak references to the object (if defined)