# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import threading
from functools import wraps
import contextvars
import contextlib
import towhee.runtime.pipeline_loader as pipe_loader
import towhee.runtime.node_config as nd_conf
from towhee.utils.log import engine_log
_AUTO_CONFIG_VAR: contextvars.ContextVar = contextvars.ContextVar('auto_config_var')
@contextlib.contextmanager
def set_config_name(name: str):
token = _AUTO_CONFIG_VAR.set(name)
yield
_AUTO_CONFIG_VAR.reset(token)
def get_config_name():
try:
return _AUTO_CONFIG_VAR.get()
except: # pylint: disable=bare-except
return None
# pylint: disable=invalid-name
[docs]
class AutoConfig:
"""
Auto configuration.
"""
_REGISTERED_CONFIG = {}
_lock = threading.Lock()
def __init__(self):
raise EnvironmentError(
'AutoConfig is designed to be instantiated, please using the `AutoConfig.LocalCPUConfig()` etc.'
)
@staticmethod
def register(config):
@wraps(config)
def wrapper(*args, **kwargs):
return config(*args, **kwargs)
name = get_config_name()
if name is not None:
AutoConfig._REGISTERED_CONFIG[name] = wrapper
return wrapper
[docs]
@staticmethod
def load_config(name: str, *args, **kwargs):
"""
Load config from pre-defined pipeline.
Examples:
>>> from towhee import AutoConfig
>>> config = AutoConfig.load_config('sentence_embedding')
SentenceSimilarityConfig(model='all-MiniLM-L6-v2', openai_api_key=None, customize_embedding_op=None, normalize_vec=True, device=-1)
"""
with AutoConfig._lock:
if name in AutoConfig._REGISTERED_CONFIG:
return AutoConfig._REGISTERED_CONFIG[name](*args, **kwargs)
with set_config_name(name):
pipe_loader.PipelineLoader.load_pipeline(name)
if name in AutoConfig._REGISTERED_CONFIG:
return AutoConfig._REGISTERED_CONFIG[name](*args, **kwargs)
engine_log.error('Can not find config: %s', name)
return None
[docs]
@staticmethod
def LocalCPUConfig():
"""
Auto configuration to run with local CPU.
Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('a')
... .flat_map('a', 'b', lambda x: [y for y in x], config=AutoConfig.LocalCPUConfig())
... .output('b'))
"""
return nd_conf.TowheeConfig.set_local_config(device=-1)
[docs]
@staticmethod
def LocalGPUConfig(device: int = 0):
"""
Auto configuration to run with local GPU.
Args:
device (`int`): the number of GPU device, defaults to 0.
Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
... .map('url', 'image', ops.image_decode.cv2())
... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.LocalGPUConfig())
... .output('vec')
... )
"""
return nd_conf.TowheeConfig.set_local_config(device=device)
[docs]
@staticmethod
def TritonCPUConfig(num_instances_per_device: int = 1,
max_batch_size: int = None,
batch_latency_micros: int = None,
preferred_batch_size: list = None):
"""
Auto configuration to run with triton server(CPU).
Args:
max_batch_size(`int`):
maximum batch size, defaults to None, and it will be auto-generated by triton.
batch_latency_micros(`int`):
time to the request, in microseconds, defaults to None, and it will auto-generated by triton.
num_instances_per_device(`int`):
the number of instances per device, defaults to 1.
preferred_batch_size(`list`):
preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.
Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
... .map('url', 'image', ops.image_decode.cv2())
... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonCPUConfig())
... .output('vec')
... )
You can also to set the configuration:
>>> from towhee import pipe, AutoConfig
>>> config = AutoConfig.TritonCPUConfig(num_instances_per_device=3,
... max_batch_size=128,
... batch_latency_micros=100000,
... preferred_batch_size=[8, 16])
>>> p = (pipe.input('url')
... .map('url', 'image', ops.image_decode.cv2())
... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config)
... .output('vec')
... )
"""
return nd_conf.TowheeConfig.set_triton_config(device_ids=None,
num_instances_per_device=num_instances_per_device,
max_batch_size=max_batch_size,
batch_latency_micros=batch_latency_micros,
preferred_batch_size=preferred_batch_size)
[docs]
@staticmethod
def TritonGPUConfig(device_ids: list = None,
num_instances_per_device: int = 1,
max_batch_size: int = None,
batch_latency_micros: int = None,
preferred_batch_size: list = None):
"""
Auto configuration to run with triton server(GPUs).
Args:
device_ids(`list`):
list of GPUs, defaults to [0].
max_batch_size(`int`):
maximum batch size, defaults to None, and it will be auto-generated by triton.
batch_latency_micros(`int`):
time to the request, in microseconds, defaults to None, and it will auto-generated by triton.
num_instances_per_device(`int`):
the number of instances per device, defaults to 1.
preferred_batch_size(`list`):
preferred batch sizes for dynamic batching, defaults to None, and it will be auto-generated by triton.
Examples:
>>> from towhee import pipe, AutoConfig
>>> p = (pipe.input('url')
... .map('url', 'image', ops.image_decode.cv2())
... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=AutoConfig.TritonGPUConfig())
... .output('vec')
... )
You can also to set the configuration:
>>> from towhee import pipe, AutoConfig
>>> config = AutoConfig.TritonGPUConfig(device_ids=[0, 1],
... num_instances_per_device=3,
... max_batch_size=128,
... batch_latency_micros=100000,
... preferred_batch_size=[8, 16])
>>> p = (pipe.input('url')
... .map('url', 'image', ops.image_decode.cv2())
... .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'), config=config)
... .output('vec')
... )
"""
if device_ids is None:
device_ids = [0]
return nd_conf.TowheeConfig.set_triton_config(device_ids=device_ids,
num_instances_per_device=num_instances_per_device,
max_batch_size=max_batch_size,
batch_latency_micros=batch_latency_micros,
preferred_batch_size=preferred_batch_size)
AutoConfig._REGISTERED_CONFIG['LocalCPUConfig'] = AutoConfig.LocalCPUConfig # pylint: disable=protected-access
AutoConfig._REGISTERED_CONFIG['LocalGPUConfig'] = AutoConfig.LocalGPUConfig # pylint: disable=protected-access
AutoConfig._REGISTERED_CONFIG['TritonCPUConfig'] = AutoConfig.TritonCPUConfig # pylint: disable=protected-access
AutoConfig._REGISTERED_CONFIG['TritonGPUConfig'] = AutoConfig.TritonGPUConfig # pylint: disable=protected-access