Source code for towhee.serve.io.ndarray_io

# Copyright 2023 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio

import numpy as np

from .base import IOBase, IOType
from towhee.utils.serializer import to_json, from_json


type_str_to_dtype_map = {
    'string_values': np.object_,
    'float_values': np.float32,
    'double_values': np.float64,
    'bool_values': np.bool_,
    'int32_values': np.int32,
    'int64_values': np.int64,
    'uint32_values': np.uint32,
    'uint64_values': np.uint64,
}


dtype_to_type_str_map = {
    'object': 'string_values',
    'float32': 'float_values',
    'float64': 'double_values',
    'bool': 'bool_values',
    'int32': 'int32_values',
    'int64': 'int64_values',
    'uint32': 'uint32_values',
    'unit64': 'uint64_values'
}


[docs]class NDARRAY(IOBase, io_type=IOType.NDARRAY): """Numpy Ndarray Example: .. code-block:: python import typing as T from pydantic import BaseModel from towhee import AutoPipes, api_service from towhee.serve.io import TEXT, NDARRAY service = api_service.APIService(desc="Welcome") stn = AutoPipes.pipeline('sentence_embedding') @service.api(path='/embedding', input_model=TEXT(), output_model=NDARRAY()) def chat(text: str): return stn(text).to_list()[0][0] """ _mime_type = 'application/json' def from_http(self, request: 'fastapi.Request') -> 'ndarray': json_str = asyncio.run(request.body()) return from_json(json_str) def to_http(self, obj: 'ndarray') -> 'Response': from fastapi import Response # pylint: disable=import-outside-toplevel assert isinstance(obj, np.ndarray) content = to_json(obj) return Response(content=content, media_type=self._mime_type) def from_proto(self, content: 'service_pb2.Content') -> 'ndarray': type_str = content.tensor.dtype shape = content.tensor.shape data = content.tensor.data array = np.array(getattr(data, type_str), type_str_to_dtype_map[type_str]) if shape: array.reshape(shape) return array def to_proto(self, obj: 'ndarray') -> 'service_pb2.Content': from towhee.serve.grpc import service_pb2 # pylint: disable=import-outside-toplevel assert isinstance(obj, np.ndarray) dtype = str(obj.dtype) if dtype not in dtype_to_type_str_map: raise RuntimeError('Unsupport dtype: %s' % dtype) return service_pb2.Content(tensor=service_pb2.Tensor( dtype=dtype_to_type_str_map[dtype], shape=obj.shape, data=self._to_tensorcontent(obj) )) def _to_tensorcontent(self, obj: 'ndarray'): from towhee.serve.grpc import service_pb2 # pylint: disable=import-outside-toplevel if obj.dtype == np.object_: return service_pb2.TensorContents( string_values=obj ) else: return service_pb2.TensorContents( **{dtype_to_type_str_map[str(obj.dtype)]: obj.ravel().tolist()} )