DataCollection in 10 Minutes

This section is a short introduction to DataCollection, an unstructured data processing framework provided by towhee. More complex examples can be found in the Towhee GitHub.

Preparation

The latest version of towhee can be installed with pip, or python -m pip if pip is not presented in your PATH:

$ pip install towhee
$ python -m pip install towhee

With the package installed, we can import towhee with the following:

>>> import towhee

Creating a DataCollection

DataCollection is an enhancement to the built-in list in Python. Creating a DataCollection from a list is as simple as:

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc
[0, 1, 2, 3]

The behavior of DataCollection is designed to be mimic list, making it easy to understand for most Python users and compatible with most of the popular data science toolkits;

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc
[0, 1, 2, 3]

# indexing
>>> dc[1], dc[2]
(1, 2)

# slicing
>>> dc[:2]
[0, 1]

# appending
>>> dc.append(4).append(5)
[0, 1, 2, 3, 4, 5]

Viewing Data

We can take a quick look at the data by head():

>>> dc = towhee.dc([0, 1, 2, 3, 4, 5, 6, 7, 8, 9,])
>>> dc.head(5)
[0, 1, 2, 3, 4]

If you are running within a jupyter notebook, show() is recommended as it provides a better interface:

Hide code cell content
import towhee
dc = towhee.dc([0, 1, 2, 3])
dc.show(limit=5)
0
1
2
3

Processing Data

Applying a Function

Applying a function to the elements in a DataCollection can be done with a simple map() call:

>>> towhee.dc([0, 1, 2, 3, 4]).map(lambda x: x*2)
[0, 2, 4, 6, 8]

Applying a Filter

Filtering the data in a DataCollection:

>>> towhee.dc([0, 1, 2, 3, 4]).filter(lambda x: int(x%2)==0)
[0, 2, 4]

Chaining Data Processing Steps

DataCollection supports method-chaining style programming, making the code clean and fluent.

>>> (
...   	towhee.dc([0, 1, 2, 3, 4])
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
[4, 8]

>>> list(
...     map(
...         lambda x: x*2,
...         map(lambda x: x+1,
...             filter(lambda x: x%2==1,
...                    [0, 1, 2, 3, 4])
...         )
...     )
... )
[4, 8]

>>> result = []
>>> for x in [0, 1, 2, 3, 4]:
...     if x%2 == 1:
...         x = x+1
...         x = x*2
...         result.append(x)
>>> result
[4, 8]

The code using DataCollection is more straightforward, as each action generates a new DataCollection, thus allowing step by step instructions.

Towhee Operators

Operators are the basic units of computation that can be applied to the elements within a DataCollection. There are many predefined Operators on the Towhee hub, including popular deep learning models ranging from computer vision to natural language processing.

Using Operators

We can load an Operator from the Towhee hub with the following:

>>> from towhee import ops
>>> op = ops.towhee.image_decode()
>>> img = op('./towhee_logo.png')

Where towhee is the namespace of the operator, and image_decode is the operator name. An operator is usually referred to with its full name: namespace/name.

towhee is the namespace for official operators, and also is the default namespace if not specified:

>>> from towhee import ops
>>> op = ops.image_decode()
>>> img = op('./towhee_logo.png')

Custom Operators

It is also easy to define custom operators with standard Python functions:

>>> from towhee import register
>>> @register
... def add_1(x):
...     return x+1
>>> ops.add_1()(2)
3

If the operator needs additional initializations arguments, it can be defined as a class:

>>> @register
... class add_x:
...     def __init__(self, x):
...         self._x = x
...     def __call__(self, y):
...         return self._x + y

>>> ops.add_x(x=1)(2)
3

Using named Operator’s with DataCollection

When an operator is uploaded to the Towhee hub or registered with @register, we can call the operato directly on a DataCollection:

>>> @register
... def add_1(x):
...     return x+1

>>> (
...     towhee.dc([0, 1, 2, 3, 4])
...         .add_1()
... )
[1, 2, 3, 4, 5]

add_1() is an operator that was registered to towhee using a decorator. We can invoke the operator by calling it as a method of DataCollection. DataCollection will dispatch missing function calls to the registered operators.

Such call dispatching makes the code easy to read. Here is code comparison of using call dispatch:

towhee.dc(some_image_list) \
    .image_decode() \
    .towhee.image_embedding(model_name='resnet50') \
    .tensor_normalize(axis=1)
towhee.dc(some_image_list) \
    .map(ops.image_decode()) \
    .map(ops.towhee.image_embedding(model_name='resnet50')) \
    .map(ops.tensor_normalize(axis=1))
image_decode = ops.image_decode()
image_embedding = ops.towhee.image_embedding(model_name='resnet50')
tensor_normalize = ops.tensor_normalize(axis=1)

result = []
for path in some_image_list:
  img = image_decode(path)
  embedding = image_embedding(img)
  vec = tensor_normalize(embedding)
  result.append(vec)

Stream Processing

For large-scale datasets, using a list is too memory-intensive due to having to load the entire dataset into memory. Because of this, users often opt for stream processing with Python generators. These generators allow you to act on values as they come in, instead of having to wait for all the previous values to finish first before moving to the next step.

Towhee provides a similar streaming mechanism within DataCollection.

Creating a Streamed DataCollection

A streamed DataCollection is created from a generator:

>>> dc = towhee.dc(iter([0, 1, 2, 3]))
>>> dc 
<list_iterator object at ...>

We can also convert an unstreamed DataCollection into a streamed one:

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc.stream() 
<list_iterator object at ...>

Using Streamed DataCollections

Streamed DataCollection’s are designed to behave in the same way as the unstreamed ones. One important details is that the computation will not run until we begin consuming items from the DataCollection.

>>> def debug_print(x):
...     print(f'debug print: {x}')
...     return x

>>> dc = ( 
...   	towhee.dc(iter([0, 1, 2, 3, 4]))
...           .map(debug_print)
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
>>> dc
<map object at 0x...>

>>> # consume the streamed dc and collection the result into a list
>>> [x for x in dc]
debug print: 0
debug print: 1
debug print: 2
debug print: 3
debug print: 4
[4, 8]

>>> def debug_print(x):
...     print(f'debug print: {x}')
...     return x

>>> dc = (
...   	towhee.dc([0, 1, 2, 3, 4])
...           .map(debug_print)
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
debug print: 0
debug print: 1
debug print: 2
debug print: 3
debug print: 4
>>> dc
[4, 8]

In the example of the streamed DataCollection, debug_print() is not executed until we start to access the items in the DataCollection. But for unstreamed DataCollection, it is executed immediately.

Tabular Data

In this section we will introduce how to handle structured data with DataCollection. The term tabular refers to structured data that is organized into columns and rows, a widely used format by data scientists and supported by most machine learning toolkits.

Creating a DataCollection with a Schema

  • We can directly read data from files:

dc = towhee.read_csv('some.csv')
dc = towhee.read_json('some.json')
  • We can also load data from a pandas DataFrame:

df = pandas.read_sql(...)
dc = towhee.from_df(df)
  • We can also convert a list of dicts into a DataCollection:

>>> dc = towhee.dc([{'a': i, 'b': i*2} for i in range(5)]).as_entity()
>>> dc.show()
2023-05-31 11:06:07,337 - 140059675723584 - git_utils.py-git_utils:96 - ERROR: towhee/as-entity repo does not exist.
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/engine/operator_loader.py:134, in OperatorLoader.load_operator_from_cache(self, function, arg, kws, tag)
    133     fm = FileManager()
--> 134     path = fm.get_operator(operator=function, tag=tag)
    135 except ValueError as e:

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/hub/file_manager.py:453, in FileManager.get_operator(self, operator, tag, install_reqs)
    452     git = GitUtils(author, repo)
--> 453     git.clone(tag=tag, install_reqs=install_reqs, local_repo_path=repo_path)
    454 # If user does not have git in the system, subprocess throws FileNotFound error.

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/utils/git_utils.py:97, in GitUtils.clone(self, tag, install_reqs, local_repo_path)
     96     engine_log.error('%s/%s repo does not exist.', self._author, self._repo)
---> 97     raise ValueError(f'{self._author}/{self._repo} repo does not exist.')
     99 url = f'{self._root}/{self._author}/{self._repo}.git'

ValueError: towhee/as-entity repo does not exist.

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Cell In[3], line 1
----> 1 dc = towhee.dc([{'a': i, 'b': i*2} for i in range(5)]).as_entity()
      2 dc.show()

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/hparam/hyperparameter.py:198, in DynamicDispatch.__call__(self, *args, **kws)
    196 def __call__(self, *args, **kws) -> Any:
    197     with param_scope(_index=self._index, _name=self._name):
--> 198         return self._func(*args, **kws)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/functional/data_collection.py:126, in DataCollection.__getattr__.<locals>.wrapper(*arg, **kws)
    124 else:
    125     op = self.resolve(path, index, *arg, **kws)
--> 126 return self.map(op)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/functional/mixins/dag.py:11, in register_dag.<locals>.wrapper(self, *arg, **kws)
      8 @wraps(f)
      9 def wrapper(self, *arg, **kws):
     10     # Get the result DataCollections
---> 11     children = f(self, *arg, **kws)
     12     # Need the dc type while avoiding circular imports
     13     dc_type = type(children[0]) if isinstance(children, list) else type(children)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/functional/data_collection.py:343, in DataCollection.map(self, *arg)
    340 unary_op = arg[0]
    342 # smap map for stateful operator
--> 343 if hasattr(unary_op, 'is_stateful') and unary_op.is_stateful:
    344     return self.smap(unary_op)
    346 # pmap

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/engine/execution/stateful_execution.py:30, in StatefulExecution.is_stateful(self)
     28 @property
     29 def is_stateful(self):
---> 30     self.__check_init__()
     31     return hasattr(self._op, 'fit')

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/engine/factory.py:93, in _OperatorLazyWrapper.__check_init__(self)
     91 if self._op is None:
     92     with param_scope(index=self._index):
---> 93         self._op = op(self._name,
     94                       self._tag,
     95                       arg=self._arg,
     96                       kwargs=self._kws)
     97         if hasattr(self._op, '__vcall__'):
     98             self.__has_vcall__ = True

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/engine/factory.py:61, in op(operator_src, tag, arg, kwargs)
     59     return loader.load_operator_from_path(operator_src, arg, kwargs)
     60 else:
---> 61     return loader.load_operator(operator_src, arg, kwargs, tag)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/engine/operator_loader.py:162, in OperatorLoader.load_operator(self, function, arg, kws, tag)
    144 """Attempts to load an operator from cache. If it does not exist, looks up the
    145 operator in a remote location and downloads it to cache instead. By standard
    146 convention, the operator must be called `Operator` and all associated data must
   (...)
    155         Cannot find operator.
    156 """
    158 for factory in [self.load_operator_from_internal,
    159                 self.load_operator_from_registry,
    160                 self.load_operator_from_packages,
    161                 self.load_operator_from_cache]:
--> 162     op = factory(function, arg, kws, tag)
    163     if op is not None:
    164         return op

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.0/lib/python3.9/site-packages/towhee-0.7.4.dev31-py3.9.egg/towhee/engine/operator_loader.py:136, in OperatorLoader.load_operator_from_cache(self, function, arg, kws, tag)
    134     path = fm.get_operator(operator=function, tag=tag)
    135 except ValueError as e:
--> 136     raise ValueError('operator {} not found!'.format(function)) from e
    138 if path is None:
    139     raise FileExistsError('Cannot find operator.')

ValueError: operator towhee/as-entity not found!

We call each row of the table an Entity. Both a and b are fields within the entity.

Apply Functions/Operators according to schema

We can apply an operator according to the fields of the entities:

>>> @towhee.register
... def add_1(x):
...   return x+1

>>> dc.add_1['a', 'c']().show()

['a', 'c'] is the syntax for specifying operator input and output, field a is used as input, and field c is used as output. We can also apply a lambda function to tabular data with runas_op:

>>> dc.runas_op['b', 'd'](func=lambda x: x*2).show()

Advanced Features

DataCollection also support advanced features such as parallel execution and distributed execution. To get more details about advanced feature, please refer to the API document of DataCollection.