Source code for towhee.serve.triton.python_model_builder

# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import json
from typing import List, Tuple, Any, Dict

import towhee.serve.triton.type_gen as tygen
from towhee.serve.triton import format_utils as fmt


[docs]class PyModelBuilder: """ The base class of python model builder. Subclass of PyModelBuilder should implement the following methods: `gen_import`, `gen_init`. """ def gen_imports(self): raise NotImplementedError('gen_imports not implemented') def gen_initialize(self): raise NotImplementedError('gen_initialize not implemented') def gen_execute(self): raise NotImplementedError('gen_execute not implemented') def gen_finalize(self): raise NotImplementedError('gen_execute not implemented') def build(self, save_path: str): with open(save_path, 'wt', encoding='utf-8') as f: f.writelines(self.gen_imports()) f.writelines(self.gen_blank_lines(2)) f.writelines(self.gen_class_name()) f.writelines(self.gen_blank_lines(1)) f.writelines(fmt.intend(self.gen_initialize())) f.writelines(self.gen_blank_lines(1)) f.writelines(fmt.intend(self.gen_execute())) f.writelines(self.gen_blank_lines(1)) f.writelines(fmt.intend(self.gen_finalize())) @staticmethod def gen_class_name(): lines = ['class TritonPythonModel:'] return fmt.add_line_separator(lines) @staticmethod def gen_blank_lines(n=1): lines = [''] * n return fmt.add_line_separator(lines) @staticmethod def _from_tensor_to_obj( type_info: tygen.TypeInfo, obj_init_code: str, obj_var: str, tensor_vars: List[str] ): """ Generate the codes converting Tensor to python object """ trs_placeholders = [attr.tensor_placeholder for attr in type_info.attr_info] # check existence of placeholders for tr in trs_placeholders: if obj_init_code.find(tr) == -1: raise ValueError('Can not match placeholder %s with init function %s.' % (tr, obj_init_code)) if type_info.is_list: init_args = ['arg' + str(i) for i in range(len(trs_placeholders))] for tr, arg in zip(trs_placeholders, init_args): obj_init_code = obj_init_code.replace(tr, arg) line = obj_var + ' = [' + obj_init_code + ' for ' + ', '.join(init_args) + ' in zip(' + ', '.join(tensor_vars) + ')]' else: for tr, arg in zip(trs_placeholders, tensor_vars): obj_init_code = obj_init_code.replace(tr, arg) line = obj_var + ' = ' + obj_init_code return [line] @staticmethod def _from_obj_to_tensor( type_info: tygen.TypeInfo, obj_var: str, tensor_vars: List[str], tensor_names: List[str], obj_placeholder='$obj' ): """ Generate the codes converting python object to Tensor """ lines = [] for attr, tr_name, tr_var in zip(type_info.attr_info, tensor_names, tensor_vars): data = attr.obj_placeholder.replace(obj_placeholder, obj_var) dtype = attr.numpy_dtype if tygen.is_scalar(attr): ndarray = 'numpy.array([' + data + '], ' + dtype + ')' else: ndarray = 'numpy.array(' + data + ', ' + dtype + ')' line = tr_var + ' = pb_utils.Tensor(\'' + tr_name + '\', ' + ndarray + ')' lines.append(line) return lines
[docs]class PickledCallablePyModelBuilder(PyModelBuilder): """ Build python model from pickled """
[docs] def __init__( self, module_name: str, python_file_path: str, pickle_file_name: str, input_annotations: List[Tuple[Any, Tuple]], output_annotations: List[Tuple[Any, Tuple]] ): self.module_name = module_name self.python_file_path = python_file_path self.pickle_file_name = pickle_file_name self.input_annotations = input_annotations self.output_annotations = output_annotations
def gen_imports(self): lines = [] lines.append('import towhee') lines.append('import numpy') lines.append('from pathlib import Path') lines.append('import pickle') lines.append('import importlib') lines.append('import sys') lines.append('import triton_python_backend_utils as pb_utils') return fmt.add_line_separator(lines) def gen_initialize(self): lines = [] lines.append('def initialize(self, args):') lines.append('') lines.append('# load module') lines.append(f'module_name = "{self.module_name}"') lines.append('device = "cpu"') lines.append('if args["model_instance_kind"] == "GPU":') lines.append(fmt.intend('device = int(args["model_instance_device_id"])')) lines.append(f'path = "{self.python_file_path}"') lines.append('spec = importlib.util.spec_from_file_location(module_name, path)') lines.append('module = importlib.util.module_from_spec(spec)') lines.append('sys.modules[module_name] = module') lines.append('spec.loader.exec_module(module)') lines.append('') lines.append('# create callable object') lines.append(f'pickle_file_path = Path(__file__).parent / "{self.pickle_file_name}"') lines.append('with open(pickle_file_path, \'rb\') as f:') lines.append(fmt.intend('self.callable_obj = pickle.load(f)')) lines.append(fmt.intend('self.callable_obj._device = device')) lines.append('') lines.append('if hasattr(self.callable_obj, "to_device"):') lines.append(fmt.intend('self.callable_obj.to_device()')) lines = lines[:1] + fmt.intend(lines[1:]) return fmt.add_line_separator(lines) def gen_execute(self): lines = [] lines.append('def execute(self, requests):') lines.append('') lines.append('responses = []') lines.append('') taskloop = [] taskloop.append('for request in requests:') taskloop.append('# get input tensors from request') input_type_info = tygen.get_type_info(self.input_annotations) input_arg_init_code = tygen.get_init_code(self.input_annotations) tr_idx = 0 callable_input_args = [] for arg_idx, type_info in enumerate(input_type_info): arg = 'arg' + str(arg_idx) callable_input_args.append(arg) num_attrs = len(type_info.attr_info) tr_vars = ['in' + str(tr_idx + i) for i in range(num_attrs)] for tr in tr_vars: taskloop.append(tr + ' = pb_utils.get_input_tensor_by_name(request, \'INPUT' + str(tr_idx) + '\')') tr_idx = tr_idx + 1 taskloop.append('') taskloop.append('# create input args from tensors') l = self._from_tensor_to_obj(type_info, input_arg_init_code[arg_idx], arg, tr_vars) taskloop = taskloop + l taskloop.append('') taskloop.append('# call callable object') callable_results = ['result' + str(i) for i in range(len(self.output_annotations))] taskloop.append(', '.join(callable_results) + ' = self.callable_obj(' + ' ,'.join(callable_input_args) + ')') taskloop.append('') taskloop.append('# convert results to tensors') output_type_info = tygen.get_type_info(self.output_annotations) tr_idx = 0 tr_out = [] for result_idx, type_info in enumerate(output_type_info): num_attrs = len(type_info.attr_info) tr_vars = ['out' + str(tr_idx + i) for i in range(num_attrs)] tr_out = tr_out + tr_vars outputs = ['OUTPUT' + str(tr_idx + i) for i in range(num_attrs)] l = self._from_obj_to_tensor(type_info, callable_results[result_idx], tr_vars, outputs) taskloop = taskloop + l taskloop.append('') taskloop.append('# organize response') taskloop.append('response = pb_utils.InferenceResponse(output_tensors=[' + ', '.join(tr_out) + '])') taskloop.append('responses.append(response)') taskloop = taskloop[:1] + fmt.intend(taskloop[1:]) lines = lines + taskloop lines.append('') lines.append('return responses') lines = lines[:1] + fmt.intend(lines[1:]) return fmt.add_line_separator(lines) def gen_finalize(self): lines = [] lines.append('def finalize(self):') lines.append(fmt.intend('pass')) return fmt.add_line_separator(lines)
[docs]class OpPyModelBuilder(PyModelBuilder): """ Build python model from operator """
[docs] def __init__( self, task_name: str, op_name: str, op_init_args: Dict, input_annotations: List[Tuple[Any, Tuple]], output_annotations: List[Tuple[Any, Tuple]] ): self.task_name = task_name self.op_name = op_name self.op_init_args = op_init_args self.input_annotations = input_annotations self.output_annotations = output_annotations
def gen_imports(self): lines = [] lines.append('import towhee') lines.append('import numpy') lines.append('from towhee import ops') lines.append('import triton_python_backend_utils as pb_utils') return fmt.add_line_separator(lines) def gen_initialize(self): lines = [] lines.append('def initialize(self, args):') lines.append('') lines.append('device = "cpu"') lines.append('if args["model_instance_kind"] == "GPU":') lines.append(fmt.intend('device = int(args["model_instance_device_id"])')) lines.append('# create op instance') lines.append('task = getattr(ops, \'' + self.task_name + '\')') lines.append('init_args = ' + json.dumps(self.op_init_args)) lines.append('op_wrapper = getattr(task, \'' + self.op_name + '\')(' + '**init_args' + ')') lines.append('self.op = op_wrapper.get_op()') lines.append('self.op._device = device') lines.append('if hasattr(self.op, "to_device"):') lines.append(fmt.intend('self.op.to_device()')) lines = lines[:1] + fmt.intend(lines[1:]) return fmt.add_line_separator(lines) def gen_execute(self): lines = [] lines.append('def execute(self, requests):') lines.append('') lines.append('responses = []') lines.append('') taskloop = [] taskloop.append('for request in requests:') taskloop.append('# get input tensors from request') input_type_info = tygen.get_type_info(self.input_annotations) input_arg_init_code = tygen.get_init_code(self.input_annotations) tr_idx = 0 op_input_args = [] for arg_idx, type_info in enumerate(input_type_info): arg = 'arg' + str(arg_idx) op_input_args.append(arg) num_attrs = len(type_info.attr_info) tr_vars = ['in' + str(tr_idx + i) for i in range(num_attrs)] for tr in tr_vars: taskloop.append(tr + ' = pb_utils.get_input_tensor_by_name(request, \'INPUT' + str(tr_idx) + '\')') tr_idx = tr_idx + 1 taskloop.append('') taskloop.append('# create input args from tensors') l = self._from_tensor_to_obj(type_info, input_arg_init_code[arg_idx], arg, tr_vars) taskloop = taskloop + l taskloop.append('') taskloop.append('# call callable object') op_results = ['result' + str(i) for i in range(len(self.output_annotations))] taskloop.append(', '.join(op_results) + ' = self.op(' + ' ,'.join(op_input_args) + ')') taskloop.append('') taskloop.append('# convert results to tensors') output_type_info = tygen.get_type_info(self.output_annotations) tr_idx = 0 tr_out = [] for result_idx, type_info in enumerate(output_type_info): num_attrs = len(type_info.attr_info) tr_vars = ['out' + str(tr_idx + i) for i in range(num_attrs)] tr_out = tr_out + tr_vars outputs = ['OUTPUT' + str(tr_idx + i) for i in range(num_attrs)] l = self._from_obj_to_tensor(type_info, op_results[result_idx], tr_vars, outputs) taskloop = taskloop + l taskloop.append('') taskloop.append('# organize response') taskloop.append('response = pb_utils.InferenceResponse(output_tensors=[' + ', '.join(tr_out) + '])') taskloop.append('responses.append(response)') taskloop = taskloop[:1] + fmt.intend(taskloop[1:]) lines = lines + taskloop lines.append('') lines.append('return responses') lines = lines[:1] + fmt.intend(lines[1:]) return fmt.add_line_separator(lines) def gen_finalize(self): lines = [] lines.append('def finalize(self):') lines.append(fmt.intend('pass')) return fmt.add_line_separator(lines)
[docs]def gen_model_from_pickled_callable( save_path: str, module_name: str, python_file_path: str, pickle_file_name: str, input_annotations: List[Tuple[Any, Tuple]], output_annotations: List[Tuple[Any, Tuple]] ): builder = PickledCallablePyModelBuilder( module_name, python_file_path, pickle_file_name, input_annotations, output_annotations ) return builder.build(save_path)
[docs]def gen_model_from_op( save_path: str, task_name: str, op_name: str, op_init_args: Dict, input_annotations: List[Tuple[Any, Tuple]], output_annotations: List[Tuple[Any, Tuple]] ): builder = OpPyModelBuilder( task_name, op_name, op_init_args, input_annotations, output_annotations ) return builder.build(save_path)