# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=unused-import
# pylint: disable=dangerous-default-value
import os
import threading
from typing import Any, Dict, List, Tuple
from towhee.dataframe import DataFrame
from towhee.pipeline_format import OutputFormat
from towhee.engine.pipeline import Pipeline
from towhee.engine.engine import Engine, start_engine
from towhee.engine.operator_loader import OperatorLoader
from towhee.hub.file_manager import FileManager
from towhee.hparam.hyperparameter import dynamic_dispatch, param_scope
from towhee.hub import preclude
from .execution.base_execution import BaseExecution
from .execution.pandas_execution import PandasExecution
from .execution.stateful_execution import StatefulExecution
from .execution.vectorized_execution import VectorizedExecution
[docs]def op(operator_src: str,
tag: str = 'main',
arg: List[Any] = [],
kwargs: Dict[str, Any] = {}):
"""
Create the supplied operator.
Entry method which takes either operator tasks or paths to python files or class in notebook.
An `Operator` object is created with the init args(kwargs).
Args:
operator_src (str): Operator name or python file location or class in notebook.
tag (str, optional): Which tag to use for operators on hub. Defaults to 'main'.
arg (List[Any], optional): Operator `args` to pass in. Defaults to [].
kwargs (Dict[str, Any], optional): Operator `kwargs` to pass in. Defaults to {}.
Returns:
operator: The operator.
"""
if isinstance(operator_src, type):
class_op = type('operator', (operator_src, ), kwargs)
return class_op.__new__(class_op, **kwargs)
loader = OperatorLoader()
if os.path.isfile(operator_src):
return loader.load_operator_from_path(operator_src, arg, kwargs)
else:
return loader.load_operator(operator_src, arg, kwargs, tag)
class _OperatorLazyWrapper( #
BaseExecution, #
PandasExecution, #
StatefulExecution, #
VectorizedExecution):
"""
Operator wrapper for lazy initialization. Inherits from different execution strategies.
"""
def __init__(self,
real_name: str,
index: Tuple[str],
tag: str = 'main',
arg: List[Any] = [],
kws: Dict[str, Any] = {}) -> None:
self._name = real_name.replace('.', '/').replace('_', '-')
self._index = index
self._tag = tag
self._arg = arg
self._kws = kws
self._op = None
self._lock = threading.Lock()
self._op_config = self._kws.pop('op_config', None)
# TODO: (How to apply such config)
def __check_init__(self):
with self._lock:
if self._op is None:
# Called with param scope in order to pass index in to op.
with param_scope(index=self._index):
self._op = op(self._name,
self._tag,
arg=self._arg,
kwargs=self._kws)
if hasattr(self._op, '__vcall__'):
self.__has_vcall__ = True
def get_op(self):
self.__check_init__()
return self._op
@property
def op_config(self):
self.__check_init__()
return self._op_config
@property
def function(self):
return self._name
@property
def init_args(self):
return self._kws
@staticmethod
def callback(real_name: str, index: Tuple[str], *arg, **kws):
return _OperatorLazyWrapper(real_name, index, arg=arg, kws=kws)
# TODO: move to different location
DEFAULT_PIPELINES = {
'image-embedding': 'towhee/image-embedding-resnet50',
'image-encoding': 'towhee/image-embedding-resnet50', # TODO: add encoders
'music-embedding': 'towhee/music-embedding-vggish',
'music-encoding': 'towhee/music-embedding-clmr', # TODO: clmr -> encoder
}
class _PipelineWrapper:
"""
A wrapper class around `Pipeline`.
The class prevents users from having to create `DataFrame` instances by hand.
Args:
pipeline_ (towhee.Pipeline): Base `Pipeline` instance for which this object will provide a wrapper for.
"""
def __init__(self, pipeline_: Pipeline):
self._pipeline = pipeline_
def __call__(self, *args) -> List[Tuple]:
"""
Wrap the input arguments around a `Dataframe` for Pipeline.__call__().
"""
if not args:
raise RuntimeError('Input data is empty')
cols = []
vargs = []
for i, arg in enumerate(args):
vtype = type(arg).__name__
cols.append(('Col_' + str(i), str(vtype)))
vargs.append(arg)
vargs = tuple(vargs)
# Process the data through the pipeline.
in_df = DataFrame('_in_df', cols)
in_df.put(vargs)
out_df = self._pipeline(in_df)
format_handler = OutputFormat.get_format_handler(
self._pipeline.pipeline_type)
return format_handler(out_df)
def __repr__(self) -> str:
return repr(self._pipeline)
@property
def pipeline(self) -> Pipeline:
return self._pipeline
[docs]def pipeline(pipeline_src: str,
tag: str = 'main',
install_reqs: bool = True,
**kwargs):
"""
Entry method which takes either an input task or path to an operator YAML.
A `Pipeline` object is created (based on said task) and subsequently added to the
existing `Engine`.
Args:
pipeline_src (str): Pipeline name or YAML file location to use.
tag (str, optional): Which tag to use for operators/pipelines on hub. Defaults to 'main'.
install_reqs (bool, optional): Whether to download the python packages if a requirements.txt file is included in the repo.. Defaults to True.
Returns:
_PipelineWrapper: The `Pipeline` output.
"""
from_ops = kwargs['from_ops'] if 'from_ops' in kwargs else False
start_engine()
if os.path.isfile(pipeline_src):
yaml_path = pipeline_src
else:
fm = FileManager()
p_repo = DEFAULT_PIPELINES.get(pipeline_src, pipeline_src)
yaml_path = fm.get_pipeline(p_repo, tag, install_reqs, from_ops)
engine = Engine()
pipeline_ = Pipeline(str(yaml_path))
with param_scope() as hp:
if not hp().towhee.dry_run(False):
engine.add_pipeline(pipeline_)
return _PipelineWrapper(pipeline_)
class _PipelineBuilder:
"""
Build a pipeline with template variables.
A pipeline template is a yaml file contains `template variables`,
which will be replaced by `variable values` when createing pipeline instance.
Examples:
```yaml
name: template_name
variables: <<-- define variables and default values
template_variable_1: default_value_1
template_variable_2: default_value_2
....
operator:
function: {template_variable_1} <<-- refer to the variable by name
```
You can specialize template variable values with the following code:
```python
pipe = _PipelineBuilder(template_variable_1='new_value').pipeline('pipeline_name')
```
"""
def __init__(self, **kws) -> None:
self._kws = kws
def pipeline(self, *arg, **kws):
with param_scope() as hp:
hp().variables = self._kws
return pipeline(*arg, **kws)
@staticmethod
def callback(name, index, *arg, **kws):
name = name.replace('.', '/').replace('_', '-')
_ = index
return _PipelineBuilder(**kws).pipeline(name, *arg, from_ops=True)
@dynamic_dispatch
def ops(*arg, **kws):
"""
Create operator instance.
Entry point for creating operator instances, for example:
>>> op_instance = ops.my_namespace.my_repo_name(init_arg1=xxx, init_arg2=xxx)
"""
# pylint: disable=protected-access
with param_scope() as hp:
real_name = hp._name
index = hp._index
return _OperatorLazyWrapper.callback(real_name, index, *arg, **kws)
@dynamic_dispatch
def pipes(*arg, **kws):
"""
Create pipeline instance.
Entry point for creating pipeline instances, for example:
>>> pipe_instance = pipes.my_namespace.my_repo_name(init_arg1=xxx, init_arg2=xxx)
"""
# pylint: disable=protected-access
with param_scope() as hp:
real_name = hp._name
index = hp._index
return _PipelineBuilder.callback(real_name, index, *arg, **kws)
[docs]def create_op(func,
name: str = 'tmp',
index: Tuple[str] = None,
arg: List[Any] = [],
kws: Dict[str, Any] = {}) -> None:
# pylint: disable=protected-access
operator = _OperatorLazyWrapper(name, index, arg=arg, kws=kws)
operator._op = func
return operator