DataCollection in 10 Minutes

This section is a short introduction to DataCollection, an unstructured data processing framework provided by towhee. More complex examples can be found in the Towhee GitHub.

Preparation

The latest version of towhee can be installed with pip, or python -m pip if pip is not presented in your PATH:

$ pip install towhee
$ python -m pip install towhee

With the package installed, we can import towhee with the following:

>>> import towhee

Creating a DataCollection

DataCollection is an enhancement to the built-in list in Python. Creating a DataCollection from a list is as simple as:

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc
[0, 1, 2, 3]

The behavior of DataCollection is designed to be mimic list, making it easy to understand for most Python users and compatible with most of the popular data science toolkits;

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc
[0, 1, 2, 3]

# indexing
>>> dc[1], dc[2]
(1, 2)

# slicing
>>> dc[:2]
[0, 1]

# appending
>>> dc.append(4).append(5)
[0, 1, 2, 3, 4, 5]

Viewing Data

We can take a quick look at the data by head():

>>> dc = towhee.dc([0, 1, 2, 3, 4, 5, 6, 7, 8, 9,])
>>> dc.head(5)
[0, 1, 2, 3, 4]

If you are running within a jupyter notebook, show() is recommended as it provides a better interface:

Hide code cell content
import towhee
dc = towhee.dc([0, 1, 2, 3])
/home/docs/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
dc.show(limit=5)
0
1
2
3

Processing Data

Applying a Function

Applying a function to the elements in a DataCollection can be done with a simple map() call:

>>> towhee.dc([0, 1, 2, 3, 4]).map(lambda x: x*2)
[0, 2, 4, 6, 8]

Applying a Filter

Filtering the data in a DataCollection:

>>> towhee.dc([0, 1, 2, 3, 4]).filter(lambda x: int(x%2)==0)
[0, 2, 4]

Chaining Data Processing Steps

DataCollection supports method-chaining style programming, making the code clean and fluent.

>>> (
...   	towhee.dc([0, 1, 2, 3, 4])
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
[4, 8]

>>> list(
...     map(
...         lambda x: x*2,
...         map(lambda x: x+1,
...             filter(lambda x: x%2==1,
...                    [0, 1, 2, 3, 4])
...         )
...     )
... )
[4, 8]

>>> result = []
>>> for x in [0, 1, 2, 3, 4]:
...     if x%2 == 1:
...         x = x+1
...         x = x*2
...         result.append(x)
>>> result
[4, 8]

The code using DataCollection is more straightforward, as each action generates a new DataCollection, thus allowing step by step instructions.

Towhee Operators

Operators are the basic units of computation that can be applied to the elements within a DataCollection. There are many predefined Operators on the Towhee hub, including popular deep learning models ranging from computer vision to natural language processing.

Using Operators

We can load an Operator from the Towhee hub with the following:

>>> from towhee import ops
>>> op = ops.towhee.image_decode()
>>> img = op('./towhee_logo.png')

Where towhee is the namespace of the operator, and image_decode is the operator name. An operator is usually referred to with its full name: namespace/name.

towhee is the namespace for official operators, and also is the default namespace if not specified:

>>> from towhee import ops
>>> op = ops.image_decode()
>>> img = op('./towhee_logo.png')

Custom Operators

It is also easy to define custom operators with standard Python functions:

>>> from towhee import register
>>> @register
... def add_1(x):
...     return x+1
>>> ops.add_1()(2)
3

If the operator needs additional initializations arguments, it can be defined as a class:

>>> @register
... class add_x:
...     def __init__(self, x):
...         self._x = x
...     def __call__(self, y):
...         return self._x + y

>>> ops.add_x(x=1)(2)
3

Using named Operator’s with DataCollection

When an operator is uploaded to the Towhee hub or registered with @register, we can call the operato directly on a DataCollection:

>>> @register
... def add_1(x):
...     return x+1

>>> (
...     towhee.dc([0, 1, 2, 3, 4])
...         .add_1()
... )
[1, 2, 3, 4, 5]

add_1() is an operator that was registered to towhee using a decorator. We can invoke the operator by calling it as a method of DataCollection. DataCollection will dispatch missing function calls to the registered operators.

Such call dispatching makes the code easy to read. Here is code comparison of using call dispatch:

towhee.dc(some_image_list) \
    .image_decode() \
    .towhee.image_embedding(model_name='resnet50') \
    .tensor_normalize(axis=1)
towhee.dc(some_image_list) \
    .map(ops.image_decode()) \
    .map(ops.towhee.image_embedding(model_name='resnet50')) \
    .map(ops.tensor_normalize(axis=1))
image_decode = ops.image_decode()
image_embedding = ops.towhee.image_embedding(model_name='resnet50')
tensor_normalize = ops.tensor_normalize(axis=1)

result = []
for path in some_image_list:
  img = image_decode(path)
  embedding = image_embedding(img)
  vec = tensor_normalize(embedding)
  result.append(vec)

Stream Processing

For large-scale datasets, using a list is too memory-intensive due to having to load the entire dataset into memory. Because of this, users often opt for stream processing with Python generators. These generators allow you to act on values as they come in, instead of having to wait for all the previous values to finish first before moving to the next step.

Towhee provides a similar streaming mechanism within DataCollection.

Creating a Streamed DataCollection

A streamed DataCollection is created from a generator:

>>> dc = towhee.dc(iter([0, 1, 2, 3]))
>>> dc 
<list_iterator object at ...>

We can also convert an unstreamed DataCollection into a streamed one:

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc.stream() 
<list_iterator object at ...>

Using Streamed DataCollections

Streamed DataCollection’s are designed to behave in the same way as the unstreamed ones. One important details is that the computation will not run until we begin consuming items from the DataCollection.

>>> def debug_print(x):
...     print(f'debug print: {x}')
...     return x

>>> dc = ( 
...   	towhee.dc(iter([0, 1, 2, 3, 4]))
...           .map(debug_print)
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
>>> dc
<map object at 0x...>

>>> # consume the streamed dc and collection the result into a list
>>> [x for x in dc]
debug print: 0
debug print: 1
debug print: 2
debug print: 3
debug print: 4
[4, 8]

>>> def debug_print(x):
...     print(f'debug print: {x}')
...     return x

>>> dc = (
...   	towhee.dc([0, 1, 2, 3, 4])
...           .map(debug_print)
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
debug print: 0
debug print: 1
debug print: 2
debug print: 3
debug print: 4
>>> dc
[4, 8]

In the example of the streamed DataCollection, debug_print() is not executed until we start to access the items in the DataCollection. But for unstreamed DataCollection, it is executed immediately.

Tabular Data

In this section we will introduce how to handle structured data with DataCollection. The term tabular refers to structured data that is organized into columns and rows, a widely used format by data scientists and supported by most machine learning toolkits.

Creating a DataCollection with a Schema

  • We can directly read data from files:

dc = towhee.read_csv('some.csv')
dc = towhee.read_json('some.json')
  • We can also load data from a pandas DataFrame:

df = pandas.read_sql(...)
dc = towhee.from_df(df)
  • We can also convert a list of dicts into a DataCollection:

>>> dc = towhee.dc([{'a': i, 'b': i*2} for i in range(5)]).as_entity()
>>> dc.show()
2022-09-30 07:26:48,609 - 140224748730176 - git_utils.py-git_utils:96 - ERROR: towhee/as-entity repo does not exist.
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/engine/operator_loader.py:137, in OperatorLoader.load_operator_from_cache(self, function, arg, kws, tag)
    136     fm = FileManager()
--> 137     path = fm.get_operator(operator=function, tag=tag)
    138 except ValueError as e:

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/hub/file_manager.py:442, in FileManager.get_operator(self, operator, tag, install_reqs)
    441     git = GitUtils(author, repo)
--> 442     git.clone(tag=tag, install_reqs=install_reqs, local_repo_path=repo_path)
    443 # If user does not have git in the system, subprocess throws FileNotFound error.

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/utils/git_utils.py:97, in GitUtils.clone(self, tag, install_reqs, local_repo_path)
     96     engine_log.error('%s/%s repo does not exist.', self._author, self._repo)
---> 97     raise ValueError(f'{self._author}/{self._repo} repo does not exist.')
     99 url = f'{self._root}/{self._author}/{self._repo}.git'

ValueError: towhee/as-entity repo does not exist.

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Cell In [3], line 1
----> 1 dc = towhee.dc([{'a': i, 'b': i*2} for i in range(5)]).as_entity()
      2 dc.show()

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/hparam/hyperparameter.py:200, in DynamicDispatch.__call__(self, *args, **kws)
    198 def __call__(self, *args, **kws) -> Any:
    199     with param_scope(_index=self._index, _name=self._name):
--> 200         return self._func(*args, **kws)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/functional/data_collection.py:142, in DataCollection.__getattr__.<locals>.wrapper(*arg, **kws)
    128 else:
    129     # import inspect
    130     # with param_scope(
   (...)
    139     #             [0].f_globals.items() if k != 'self'
    140     #         }):
    141     op = self.resolve(path, index, *arg, **kws)
--> 142 return self.map(op)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/functional/mixins/dag.py:35, in register_dag.<locals>.wrapper(self, *arg, **kws)
     32 @wraps(f)
     33 def wrapper(self, *arg, **kws):
     34     # Get the result DataCollections
---> 35     children = f(self, *arg, **kws)
     36     # Need the dc type while avoiding circular imports
     37     dc_type = type(children[0]) if isinstance(children, list) else type(children)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/functional/data_collection.py:368, in DataCollection.map(self, *arg)
    365 unary_op = arg[0]
    367 # smap map for stateful operator
--> 368 if hasattr(unary_op, 'is_stateful') and unary_op.is_stateful:
    369     return self.smap(unary_op)
    371 # pmap

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/engine/execution/stateful_execution.py:30, in StatefulExecution.is_stateful(self)
     28 @property
     29 def is_stateful(self):
---> 30     self.__check_init__()
     31     return hasattr(self._op, 'fit')

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/engine/factory.py:97, in _OperatorLazyWrapper.__check_init__(self)
     94 if self._op is None:
     95     #  Called with param scope in order to pass index in to op.
     96     with param_scope(index=self._index):
---> 97         self._op = op(self._name,
     98                       self._tag,
     99                       arg=self._arg,
    100                       kwargs=self._kws)
    101         if hasattr(self._op, '__vcall__'):
    102             self.__has_vcall__ = True

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/engine/factory.py:64, in op(operator_src, tag, arg, kwargs)
     62     return loader.load_operator_from_path(operator_src, arg, kwargs)
     63 else:
---> 64     return loader.load_operator(operator_src, arg, kwargs, tag)

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/engine/operator_loader.py:166, in OperatorLoader.load_operator(self, function, arg, kws, tag)
    147 """
    148 Attempts to load an operator from cache. If it does not exist, looks up the
    149 operator in a remote location and downloads it to cache instead. By standard
   (...)
    159         Cannot find operator.
    160 """
    162 for factory in [self.load_operator_from_internal,
    163                 self.load_operator_from_registry,
    164                 self.load_operator_from_packages,
    165                 self.load_operator_from_cache]:
--> 166     op = factory(function, arg, kws, tag)
    167     if op is not None:
    168         return op

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.8.1/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/engine/operator_loader.py:139, in OperatorLoader.load_operator_from_cache(self, function, arg, kws, tag)
    137     path = fm.get_operator(operator=function, tag=tag)
    138 except ValueError as e:
--> 139     raise ValueError('operator {} not found!'.format(function)) from e
    141 if path is None:
    142     raise FileExistsError('Cannot find operator.')

ValueError: operator towhee/as-entity not found!

We call each row of the table an Entity. Both a and b are fields within the entity.

Apply Functions/Operators according to schema

We can apply an operator according to the fields of the entities:

>>> @towhee.register
... def add_1(x):
...   return x+1

>>> dc.add_1['a', 'c']().show()

['a', 'c'] is the syntax for specifying operator input and output, field a is used as input, and field c is used as output. We can also apply a lambda function to tabular data with runas_op:

>>> dc.runas_op['b', 'd'](func=lambda x: x*2).show()

Advanced Features

DataCollection also support advanced features such as parallel execution and distributed execution. To get more details about advanced feature, please refer to the API document of DataCollection.