towhee.operator

class towhee.operator.Operator[source]

Bases: abc.ABC

Operator base class, implements __init__ and __call__,

Examples

class AddOperator(Operator):
def __init__(self, factor: int):

self._factor = factor

def __call__(self, num) -> NamedTuple(“Outputs”, [(“sum”, int)]):

Outputs = NamedTuple(“Outputs”, [(“sum”, int)]) return Outputs(self._factor + num)

abstract __init__()[source]

Init operator, before a graph starts, the framework will call Operator __init__ function.

Args:

Raises

An exception during __init__ can terminate the graph run.

abstract self()[source]

The framework calls __call__ function repeatedly for every input data.

Args:

Returns:

Raises

An exception during __init__ can terminate the graph run.

__weakref__

list of weak references to the object (if defined)

class towhee.operator.NNOperator(framework: str = 'pytorch')[source]

Bases: towhee.operator.base.Operator

Neural Network related operators that involve machine learning frameworks.

Parameters

framework (str) – The framework to apply.

__init__(framework: str = 'pytorch')[source]

Init operator, before a graph starts, the framework will call Operator __init__ function.

Args:

Raises

An exception during __init__ can terminate the graph run.

train(training_config=None, train_dataset=None, eval_dataset=None, resume_checkpoint_path=None, **kwargs)[source]

Start to train an operator.

Parameters
  • training_config (TrainingConfig) – The config of this trainer.

  • train_dataset (Union[Dataset, TowheeDataSet]) – Training dataset.

  • eval_dataset (Union[Dataset, TowheeDataSet]) – Evaluate dataset.

  • resume_checkpoint_path (str) – If resuming training, pass into the path.

  • **kwargs (Any) – Keyword Args.

setup_trainer(training_config=None, train_dataset=None, eval_dataset=None, train_dataloader=None, eval_dataloader=None, model_card=None)[source]

Set up the trainer instance in operator before training and set trainer parameters. :param training_config: The config of this trainer. :type training_config: TrainingConfig :param train_dataset: Training dataset. :type train_dataset: Union[Dataset, TowheeDataSet] :param eval_dataset: Evaluate dataset. :type eval_dataset: Union[Dataset, TowheeDataSet] :param train_dataloader: If specified, Trainer will use it to load training data.

Otherwise, Trainer will build dataloader from train_dataset.

Parameters
  • eval_dataloader (Union[DataLoader, Iterable]) – If specified, Trainer will use it to load evaluate data. Otherwise, Trainer will build dataloader from train_dataset.

  • model_card (ModelCard) – Model card contains the training informations.

Returns:

load(path: Optional[str] = None)[source]

Load the model checkpoint into an operator.

Parameters

path (str) – The folder path containing the model’s checkpoints.

save(path: str, overwrite: bool = True)[source]

Save the model checkpoint into the path.

Parameters
  • path (str) – The folder path containing the model’s checkpoints.

  • overwrite (bool) – If True, it will overwrite the same name path when existing.

Raises

(FileExistsError) – If overwrite is False, when there already exists a path, it will raise Error.

class towhee.operator.PyOperator[source]

Bases: towhee.operator.base.Operator

Python function operator, no machine learning frameworks involved.

__init__()[source]

Init operator, before a graph starts, the framework will call Operator __init__ function.

Args:

Raises

An exception during __init__ can terminate the graph run.

class towhee.operator.StatefulOperator(name)[source]

Bases: towhee.operator.base.Operator

Stateful operator.

Examples:

>>> from towhee import register
>>> from towhee import DataCollection, State
>>> from towhee.functional.entity import Entity
>>> import numpy as np
>>> @register
... class my_normalize(StatefulOperator):
...     def __init__(self, name):
...         super().__init__(name=name)
...     def fit(self):
...         self._state._mu = np.mean(self._data[0])
...         self._state._std = np.std(self._data[0])
...     def predict(self, x):
...         return (x-self._state._mu)/self._state._std
>>> dc = (
...     DataCollection.range(10)
...         .set_training(State())
...         .map(lambda x: Entity(a=x))
...         .my_normalize['a', 'b'](name='mynorm')
... )
>>> [int(x.b*10) for x in dc.to_list()]
[-15, -12, -8, -5, -1, 1, 5, 8, 12, 15]
>>> dc._state.mynorm._mu
4.5
__init__(name)[source]

Init operator, before a graph starts, the framework will call Operator __init__ function.

Args:

Raises

An exception during __init__ can terminate the graph run.

self(*arg)[source]

The framework calls __call__ function repeatedly for every input data.

Args:

Returns:

Raises

An exception during __init__ can terminate the graph run.