DataCollection in 10 Minutes

This section is a short introduction to DataCollection, an unstructured data processing framework provided by towhee. More complex examples can be found in the Towhee GitHub.

Preparation

The latest version of towhee can be installed with pip, or python -m pip if pip is not presented in your PATH:

$ pip install towhee
$ python -m pip install towhee

With the package installed, we can import towhee with the following:

>>> import towhee

Creating a DataCollection

DataCollection is an enhancement to the built-in list in Python. Creating a DataCollection from a list is as simple as:

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc
[0, 1, 2, 3]

The behavior of DataCollection is designed to be mimic list, making it easy to understand for most Python users and compatible with most of the popular data science toolkits;

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc
[0, 1, 2, 3]

# indexing
>>> dc[1], dc[2]
(1, 2)

# slicing
>>> dc[:2]
[0, 1]

# appending
>>> dc.append(4).append(5)
[0, 1, 2, 3, 4, 5]

Viewing Data

We can take a quick look at the data by head():

>>> dc = towhee.dc([0, 1, 2, 3, 4, 5, 6, 7, 8, 9,])
>>> dc.head(5)
[0, 1, 2, 3, 4]

If you are running within a jupyter notebook, show() is recommended as it provides a better interface:

import towhee
dc = towhee.dc([0, 1, 2, 3])
---------------------------------------------------------------------------
ModuleNotFoundError                       Traceback (most recent call last)
File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/utils/thirdparty/pyarrow.py:18, in <module>
     16 try:
     17     # pylint: disable=unused-import
---> 18     import pyarrow as pa
     19 except ModuleNotFoundError as e:

ModuleNotFoundError: No module named 'pyarrow'

During handling of the above exception, another exception occurred:

StdinNotImplementedError                  Traceback (most recent call last)
Input In [1], in <cell line: 1>()
----> 1 import towhee
      2 dc = towhee.dc([0, 1, 2, 3])

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/__init__.py:19, in <module>
     17 from towhee.hparam import param_scope
     18 from towhee.hparam import HyperParameter as Document
---> 19 from towhee.functional import DataCollection, State, Entity, DataFrame
     20 from towhee.connectors import Connectors as connectors
     22 from towhee.functional import glob, glob_zip

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/functional/__init__.py:15, in <module>
      1 # Copyright 2021 Zilliz. All rights reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
   (...)
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 from multiprocessing import dummy
---> 15 from .data_collection import DataCollection, DataFrame
     16 from .entity import Entity
     17 from .option import Option, Some, Empty

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/functional/data_collection.py:25, in <module>
     23 from towhee.functional.mixins import DCMixins
     24 from towhee.functional.mixins.dataframe import DataFrameMixin
---> 25 from towhee.functional.mixins.column import ColumnMixin
     28 class DataCollection(Iterable, DCMixins):
     29     """
     30     DataCollection is a pythonic computation and processing framework
     31     for unstructured data in machine learning and data science.
   (...)
     91 
     92     """

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/functional/mixins/column.py:16, in <module>
      1 # Copyright 2021 Zilliz. All rights reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
   (...)
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 from enum import Flag, auto
---> 16 from towhee.utils.thirdparty.pyarrow import pa
     17 from towhee.types.tensor_array import TensorArray
     18 from towhee.hparam.hyperparameter import param_scope

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/utils/thirdparty/pyarrow.py:21, in <module>
     19 except ModuleNotFoundError as e:
     20     from towhee.utils.dependency_control import prompt_install
---> 21     prompt_install('pyarrow')
     22     import pyarrow as pa

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/utils/dependency_control.py:24, in prompt_install(package)
     19 def prompt_install(package): # pragma: no cover
     20     """
     21     Function used to prompt user to install a package. If TOWHEE_WORKER env variable is set
     22     to True then the package will be automatically installed.
     23     """
---> 24     if os.getenv('TOWHEE_WORKER', 'False') == 'True' or get_yes_no(f'Do you want to install {package}?'):
     25         try:
     26             subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/towhee-0.1.dev50-py3.9.egg/towhee/utils/dependency_control.py:33, in get_yes_no(question)
     31 def get_yes_no(question): # pragma: no cover
     32     while True:
---> 33         answer = input(question + ' [y/n]: ').lower()
     34         if 'yes'.startswith(answer.lower()):
     35             return True

File ~/checkouts/readthedocs.org/user_builds/towhee/envs/branch0.7/lib/python3.9/site-packages/ipykernel/kernelbase.py:1174, in Kernel.raw_input(self, prompt)
   1167 """Forward raw_input to frontends
   1168 
   1169 Raises
   1170 ------
   1171 StdinNotImplementedError if active frontend doesn't support stdin.
   1172 """
   1173 if not self._allow_stdin:
-> 1174     raise StdinNotImplementedError(
   1175         "raw_input was called, but this frontend does not support input requests."
   1176     )
   1177 return self._input_request(
   1178     str(prompt),
   1179     self._parent_ident["shell"],
   1180     self.get_parent("shell"),
   1181     password=False,
   1182 )

StdinNotImplementedError: raw_input was called, but this frontend does not support input requests.
dc.show(limit=5)

Processing Data

Applying a Function

Applying a function to the elements in a DataCollection can be done with a simple map() call:

>>> towhee.dc([0, 1, 2, 3, 4]).map(lambda x: x*2)
[0, 2, 4, 6, 8]

Applying a Filter

Filtering the data in a DataCollection:

>>> towhee.dc([0, 1, 2, 3, 4]).filter(lambda x: int(x%2)==0)
[0, 2, 4]

Chaining Data Processing Steps

DataCollection supports method-chaining style programming, making the code clean and fluent.

>>> (
...   	towhee.dc([0, 1, 2, 3, 4])
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
[4, 8]

>>> list(
...     map(
...         lambda x: x*2,
...         map(lambda x: x+1,
...             filter(lambda x: x%2==1,
...                    [0, 1, 2, 3, 4])
...         )
...     )
... )
[4, 8]

>>> result = []
>>> for x in [0, 1, 2, 3, 4]:
...     if x%2 == 1:
...         x = x+1
...         x = x*2
...         result.append(x)
>>> result
[4, 8]

The code using DataCollection is more straightforward, as each action generates a new DataCollection, thus allowing step by step instructions.

Towhee Operators

Operators are the basic units of computation that can be applied to the elements within a DataCollection. There are many predefined Operators on the Towhee hub, including popular deep learning models ranging from computer vision to natural language processing.

Using Operators

We can load an Operator from the Towhee hub with the following:

>>> from towhee import ops
>>> op = ops.towhee.image_decode()
>>> img = op('./towhee_logo.png')

Where towhee is the namespace of the operator, and image_decode is the operator name. An operator is usually referred to with its full name: namespace/name.

towhee is the namespace for official operators, and also is the default namespace if not specified:

>>> from towhee import ops
>>> op = ops.image_decode()
>>> img = op('./towhee_logo.png')

Custom Operators

It is also easy to define custom operators with standard Python functions:

>>> from towhee import register
>>> @register
... def add_1(x):
...     return x+1
>>> ops.add_1()(2)
3

If the operator needs additional initializations arguments, it can be defined as a class:

>>> @register
... class add_x:
...     def __init__(self, x):
...         self._x = x
...     def __call__(self, y):
...         return self._x + y

>>> ops.add_x(x=1)(2)
3

Using named Operator’s with DataCollection

When an operator is uploaded to the Towhee hub or registered with @register, we can call the operato directly on a DataCollection:

>>> @register
... def add_1(x):
...     return x+1

>>> (
...     towhee.dc([0, 1, 2, 3, 4])
...         .add_1()
... )
[1, 2, 3, 4, 5]

add_1() is an operator that was registered to towhee using a decorator. We can invoke the operator by calling it as a method of DataCollection. DataCollection will dispatch missing function calls to the registered operators.

Such call dispatching makes the code easy to read. Here is code comparison of using call dispatch:

towhee.dc(some_image_list) \
    .image_decode() \
    .towhee.image_embedding(model_name='resnet50') \
    .tensor_normalize(axis=1)
towhee.dc(some_image_list) \
    .map(ops.image_decode()) \
    .map(ops.towhee.image_embedding(model_name='resnet50')) \
    .map(ops.tensor_normalize(axis=1))
image_decode = ops.image_decode()
image_embedding = ops.towhee.image_embedding(model_name='resnet50')
tensor_normalize = ops.tensor_normalize(axis=1)

result = []
for path in some_image_list:
  img = image_decode(path)
  embedding = image_embedding(img)
  vec = tensor_normalize(embedding)
  result.append(vec)

Stream Processing

For large-scale datasets, using a list is too memory-intensive due to having to load the entire dataset into memory. Because of this, users often opt for stream processing with Python generators. These generators allow you to act on values as they come in, instead of having to wait for all the previous values to finish first before moving to the next step.

Towhee provides a similar streaming mechanism within DataCollection.

Creating a Streamed DataCollection

A streamed DataCollection is created from a generator:

>>> dc = towhee.dc(iter([0, 1, 2, 3]))
>>> dc 
<list_iterator object at ...>

We can also convert an unstreamed DataCollection into a streamed one:

>>> dc = towhee.dc([0, 1, 2, 3])
>>> dc.stream() 
<list_iterator object at ...>

Using Streamed DataCollections

Streamed DataCollection’s are designed to behave in the same way as the unstreamed ones. One important details is that the computation will not run until we begin consuming items from the DataCollection.

>>> def debug_print(x):
...     print(f'debug print: {x}')
...     return x

>>> dc = ( 
...   	towhee.dc(iter([0, 1, 2, 3, 4]))
...           .map(debug_print)
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
>>> dc
<map object at 0x...>

>>> # consume the streamed dc and collection the result into a list
>>> [x for x in dc]
debug print: 0
debug print: 1
debug print: 2
debug print: 3
debug print: 4
[4, 8]

>>> def debug_print(x):
...     print(f'debug print: {x}')
...     return x

>>> dc = (
...   	towhee.dc([0, 1, 2, 3, 4])
...           .map(debug_print)
...           .filter(lambda x: x%2==1)
...           .map(lambda x: x+1)
...           .map(lambda x: x*2)
... )
debug print: 0
debug print: 1
debug print: 2
debug print: 3
debug print: 4
>>> dc
[4, 8]

In the example of the streamed DataCollection, debug_print() is not executed until we start to access the items in the DataCollection. But for unstreamed DataCollection, it is executed immediately.

Tabular Data

In this section we will introduce how to handle structured data with DataCollection. The term tabular refers to structured data that is organized into columns and rows, a widely used format by data scientists and supported by most machine learning toolkits.

Creating a DataCollection with a Schema

  • We can directly read data from files:

dc = towhee.read_csv('some.csv')
dc = towhee.read_json('some.json')
  • We can also load data from a pandas DataFrame:

df = pandas.read_sql(...)
dc = towhee.from_df(df)
  • We can also convert a list of dicts into a DataCollection:

>>> dc = towhee.dc([{'a': i, 'b': i*2} for i in range(5)]).as_entity()
>>> dc.show()

We call each row of the table an Entity. Both a and b are fields within the entity.

Apply Functions/Operators according to schema

We can apply an operator according to the fields of the entities:

>>> @towhee.register
... def add_1(x):
...   return x+1

>>> dc.add_1['a', 'c']().show()

['a', 'c'] is the syntax for specifying operator input and output, field a is used as input, and field c is used as output. We can also apply a lambda function to tabular data with runas_op:

>>> dc.runas_op['b', 'd'](func=lambda x: x*2).show()

Advanced Features

DataCollection also support advanced features such as parallel execution and distributed execution. To get more details about advanced feature, please refer to the API document of DataCollection.