DataCollection
- class towhee.DataCollection(iterable: Iterable)[source]
Bases:
Iterable
,towhee.functional.mixins.AllMixins
DataCollection is a pythonic computation and processing framework for unstructured data in machine learning and data science. It allows a data scientist or researcher to assemble a data processing pipeline, do his model work (embedding, transforming, or classification) and apply it to the business (search, recommendation, or shopping) with a method-chaining style API.
Examples:
Create a data collection from list or iterator:
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc = DataCollection(iter([0, 1, 2, 3, 4]))
Chaining function invocations makes your code clean and fluent:
>>> ( ... dc.map(lambda x: x+1) ... .map(lambda x: x*2) ... ).to_list() [2, 4, 6, 8, 10]
Multi-line closures are also supported via decorator syntax
>>> dc = DataCollection([1,2,3,4]) >>> @dc.map ... def add1(x): ... return x+1 >>> @add1.map ... def mul2(x): ... return x *2 >>> mul2.to_list() [4, 6, 8, 10]
>>> dc = DataCollection([1,2,3,4]) >>> @dc.filter ... def ge3(x): ... return x>=3 >>> ge3.to_list() [3, 4]
DataCollection is designed to behave as a python list or iterator. Consider you are running the following code:
1dc.map(stage1) 2 .map(stage2)
1. iterator and stream mode: When a DataCollection object is created from an iterator, it behaves as a python iterator and performs stream-wise data processing:
DataCollection takes one element from the input and applies stage1 and stage2 sequentially ;
Since DataCollection holds no data, indexing or shuffle is not supported;
2. list and unstream mode: If a DataCollection object is created from a list, it will hold all the input values, and perform stage-wise computations:
stage2 will wait until all the calculations are done in stage1;
A new DataCollection will be created to hold all the outputs for each stage. You can perform list operations on result DataCollection;
- __init__(iterable: Iterable) None [source]
Initializes a new DataCollection instance.
- Parameters
iterable (Iterable) – input data
- stream()[source]
Create a stream data collection.
Examples: 1. Convert a data collection to streamed version
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc.is_stream False
>>> dc = dc.stream() >>> dc.is_stream True
- unstream()[source]
Create a unstream data collection.
Examples:
Create a unstream data collection
>>> dc = DataCollection(iter(range(5))).unstream() >>> dc.is_stream False
Convert a streamed data collection to unstream version
>>> dc = DataCollection(iter(range(5))) >>> dc.is_stream True >>> dc = dc.unstream() >>> dc.is_stream False
- property is_stream
Check whether the data collection is stream or unstream.
Examples:
>>> dc = DataCollection([0,1,2,3,4]) >>> dc.is_stream False
>>> result = dc.map(lambda x: x+1) >>> result.is_stream False >>> result._iterable [1, 2, 3, 4, 5]
>>> dc = DataCollection(iter(range(5))) >>> dc.is_stream True
>>> result = dc.map(lambda x: x+1) >>> result.is_stream True >>> isinstance(result._iterable, Iterable) True
- exception_safe()[source]
Making the data collection exception-safe by warp elements with Option.
Examples:
Exception breaks pipeline execution:
>>> dc = DataCollection.range(5) >>> dc.map(lambda x: x / (0 if x == 3 else 2)).to_list() Traceback (most recent call last): ZeroDivisionError: division by zero
Exception-safe execution
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty(), Some(2.0)]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5).to_list() [Some(0.0), Some(0.5), Some(1.0), Empty()]
>>> dc.exception_safe().map(lambda x: x / (0 if x == 3 else 2)).filter(lambda x: x < 1.5, drop_empty=True).to_list() [Some(0.0), Some(0.5), Some(1.0)]
- select_from(other)[source]
Select data from dc with list(self).
Examples:
>>> dc1 = DataCollection([0.8, 0.9, 8.1, 9.2]) >>> dc2 = DataCollection([[1, 2, 0], [2, 3, 0]])
>>> dc3 = dc2.select_from(dc1) >>> list(dc3) [[0.9, 8.1, 0.8], [8.1, 9.2, 0.8]]
- fill_empty(default: Optional[Any] = None) towhee.functional.data_collection.DataCollection [source]
Unbox Option values and fill Empty with default values.
- Parameters
default (Any) – default value to replace empty values;
- Returns
data collection with empty values filled with default;
- Return type
Examples:
>>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).fill_empty(-1.0).to_list() [0.0, 0.5, 1.0, -1.0, 2.0]
- drop_empty(callback: Optional[Callable] = None) towhee.functional.data_collection.DataCollection [source]
Unbox Option values and drop Empty.
- Parameters
callback (Callable) – handler for empty values;
- Returns
data collection that drops empty values;
- Return type
Examples:
>>> dc = DataCollection.range(5) >>> dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty().to_list() [0.0, 0.5, 1.0, 2.0]
Get inputs that case exceptions:
>>> exception_inputs = [] >>> result = dc.safe().map(lambda x: x / (0 if x == 3 else 2)).drop_empty(lambda x: exception_inputs.append(x.get().value)) >>> exception_inputs [3]
- map(*arg)[source]
Apply operator to data collection.
- Parameters
*arg (Callable) – functions/operators to apply to data collection;
- Returns
data collections that contains computation results;
- Return type
Examples:
>>> dc = DataCollection([1,2,3,4]) >>> dc.map(lambda x: x+1).map(lambda x: x*2).to_list() [4, 6, 8, 10]
- zip(*others) towhee.functional.data_collection.DataCollection [source]
Combine two data collections.
- Parameters
*others (DataCollection) – other data collections;
- Returns
data collection with zipped values;
- Return type
Examples:
>>> dc1 = DataCollection([1,2,3,4]) >>> dc2 = DataCollection([1,2,3,4]).map(lambda x: x+1) >>> dc3 = dc1.zip(dc2) >>> list(dc3) [(1, 2), (2, 3), (3, 4), (4, 5)]
- filter(unary_op: Callable, drop_empty=False) towhee.functional.data_collection.DataCollection [source]
Filter data collection with unary_op.
- Parameters
unary_op (Callable) – Callable to decide whether to filter the element;
drop_empty (bool) – Drop empty values. Defaults to False.
- Returns
filtered data collection
- Return type
- sample(ratio=1.0) towhee.functional.data_collection.DataCollection [source]
Sample the data collection.
- Parameters
ratio (float) – sample ratio;
- Returns
sampled data collection;
- Return type
Examples:
>>> dc = DataCollection(range(10000)) >>> result = dc.sample(0.1) >>> ratio = len(result.to_list()) / 10000. >>> 0.09 < ratio < 0.11 True
- static range(*arg, **kws)[source]
Generate data collection with ranged numbers.
Examples:
>>> DataCollection.range(5).to_list() [0, 1, 2, 3, 4]
- batch(size, drop_tail=False, raw=True)[source]
Create small batches from data collections.
- Parameters
size (int) – window size;
drop_tail (bool) – drop tailing windows that not full, defaults to False;
raw (bool) – whether to return raw data instead of DataCollection, defaults to True
- Returns
DataCollection of batched windows or batch raw data
Examples:
>>> dc = DataCollection(range(10)) >>> [list(batch) for batch in dc.batch(2, raw=False)] [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
>>> dc = DataCollection(range(10)) >>> dc.batch(3, drop_tail=True) [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
>>> from towhee import Entity >>> dc = DataCollection([Entity(a=a, b=b) for a,b in zip(['abc', 'vdfvcd', 'cdsc'], [1,2,3])]) >>> dc.batch(2) [<Entity dict_keys(['a', 'b'])>, <Entity dict_keys(['a', 'b'])>]
- rolling(size: int, drop_head=True, drop_tail=True)[source]
Create rolling windows from data collections.
- Parameters
size (int) – window size;
drop_head (bool) – drop headding windows that not full;
drop_tail (bool) – drop tailing windows that not full;
- Returns
data collection of rolling windows;
- Return type
Examples:
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3)] [[0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_head=False)] [[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]]
>>> dc = DataCollection(range(5)) >>> [list(batch) for batch in dc.rolling(3, drop_tail=False)] [[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4], [4]]
- flatten() towhee.functional.data_collection.DataCollection [source]
Flatten nested data collections.
- Returns
flattened data collection;
- Return type
Examples:
>>> dc = DataCollection(range(10)) >>> nested_dc = dc.batch(2) >>> nested_dc.flatten().to_list() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- shuffle() towhee.functional.data_collection.DataCollection [source]
Shuffle an unstreamed data collection in place.
- Returns
shuffled data collection;
- Return type
Examples:
Shuffle:
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> a = dc.shuffle() >>> tuple(a) == tuple(range(5)) False
streamed data collection is not supported:
>>> dc = DataCollection([0, 1, 2, 3, 4]).stream() >>> _ = dc.shuffle() Traceback (most recent call last): TypeError: shuffle is not supported for streamed data collection.
- getattr(self, name)[source]
Unknown method dispatcher.
When a unknown method is invoked on a DataCollection object, the function call will be dispatched to a method resolver. By registering function to the resolver, you are able to extend DataCollection’s API at runtime without modifying its code.
Examples:
Define two operators:
>>> class my_add: ... def __init__(self, val): ... self.val = val ... def __call__(self, x): ... return x+self.val
>>> class my_mul: ... def __init__(self, val): ... self.val = val ... def __call__(self, x): ... return x*self.val
Register the operators to DataCollection’s execution context with param_scope:
>>> from towhee import param_scope >>> with param_scope(dispatcher={ ... 'add': my_add, # register `my_add` as `dc.add` ... 'mul': my_mul # register `my_mul` as `dc.mul` ... }): ... dc = DataCollection([1,2,3,4]) ... dc.add(1).mul(2).to_list() # call registered operator [4, 6, 8, 10]
- self[index][source]
Indexing for data collection.
Examples:
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc[0] 0
>>> dc.stream()[1] Traceback (most recent call last): TypeError: indexing is only supported for data collection created from list or pandas DataFrame.
- self[index] = value[source]
Indexing for data collection.
Examples:
>>> dc = DataCollection([0, 1, 2, 3, 4]) >>> dc[0] 0
>>> dc[0] = 5 >>> dc._iterable[0] 5
>>> dc.stream()[0] Traceback (most recent call last): TypeError: indexing is only supported for data collection created from list or pandas DataFrame.
- append(item: Any) towhee.functional.data_collection.DataCollection [source]
Append item to data collection
- Parameters
item (Any) – the item to append
- Returns
self
- Return type
Examples:
>>> dc = DataCollection([0, 1, 2]) >>> dc.append(3).append(4) [0, 1, 2, 3, 4]
- self >> unary_op[source]
Chain the operators with >>.
Examples:
>>> dc = DataCollection([1,2,3,4]) >>> (dc ... >> (lambda x: x+1) ... >> (lambda x: x*2) ... ).to_list() [4, 6, 8, 10]
- self + other[source]
Concat two data collections:
Examples:
>>> (DataCollection.range(5) + DataCollection.range(5)).to_list() [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
>>> (DataCollection.range(5) + DataCollection.range(5) + DataCollection.range(5)).to_list() [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 0, 1, 2, 3, 4]
- repr(self) str [source]
Return a string representation for DataCollection.
Examples:
>>> DataCollection([1, 2, 3]).unstream() [1, 2, 3]
>>> DataCollection([1, 2, 3]).stream() <list_iterator object at...>
- head(n: int = 5)[source]
Get the first n lines of a DataCollection.
- Parameters
n (int) – The number of lines to print. Default value is 5.
Examples:
>>> DataCollection.range(10).head(3).to_list() [0, 1, 2]
- __weakref__
list of weak references to the object (if defined)