HubOps

class towhee.runtime.hub_ops.image_embedding.ImageEmbedding[source]

Bases: object

image_embedding

is a task that attempts to comprehend an entire image as a whole and encode the image’s semantics into a real vector. It is a fundamental task type that can be used in a variety of applications, including but not limited to reverse image search and image deduplication.

timm: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

timm operator extracts features for image with pre-trained models provided by Timm. Visual models like VGG, Resnet, ViT and so on are all available here.

__init__(self, model_name: str = None, num_classes: int = 1000, skip_preprocess: bool = False, device: str = None, checkpoint_path: str = None)
model_name(str):

The model name in string. The default value is resnet34.

num_classes(int):

The number of classes. The default value is 1000.

skip_preprocess(bool):

Whether to skip image pre-process, Default value is False.

device (str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

checkpoint_path(str):

Local weights path, if not set, will download remote weights.

__call__(self, data: Union[List[‘towhee.types.Image’], ‘towhee.types.Image’]) -> Union[List[ndarray], ndarray]:
data(‘towhee.types.Image’):

decode by ops.image_decode.cv2 or ops.image_decode.nvjpeg

Examples:

from towhee import pipe, ops, DataCollection

p = (
   pipe.input('path')
         .map('path', 'img', ops.image_decode())
         .map('img', 'vec', ops.image_embedding.timm(model_name='resnet50'))
         .output('img', 'vec')
)

DataCollection(p('towhee.jpeg')).show()
isc: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

isc operator extracts features for image top ranked models from Image Similarity Challenge 2021- Descriptor Track. The default pretrained model weights are from The 1st Place Solution of ISC21 Descriptor Track.

__init__(self, timm_backbone: str = ‘tf_efficientnetv2_m_in21ft1k’, img_size: int = 512,

checkpoint_path: str = None, skip_preprocess: bool = False, device: str = None)

timm_backbone(str):

backbone of model, the default value is tf_efficientnetv2_m_in21ft1k.

img_size(int):

Resize image, default is 512.

checkpoint_path(str):

If not set, will download remote weights.

skip_preprocess(bool):

Whether to skip image pre-process, Default value is False.

device (str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

__call__(self, data: Union[List[‘towhee.types.Image’], ‘towhee.types.Image’]) -> Union[List[ndarray], ndarray]
data(‘towhee.types.Image’):

decode by ops.image_decode.cv2 or ops.image_decode.nvjpeg

data2vec: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

data2vec operator extracts features for image with data2vec. The core idea is to predict latent representations of the full input data based on a masked view of the input in a self-distillation setup using a standard Transformer architecture.

__init__(self, model_name=’facebook/data2vec-vision-base’)
model_name(str):

The model name in string. The default value is “facebook/data2vec-vision-base-ft1k”. Supported model name:

facebook/data2vec-vision-base-ft1k

facebook/data2vec-vision-large-ft1k

__call__(self, img: ‘towhee.types.Image’) -> numpy.ndarray:
data(‘towhee.types.Image’):

decode by ops.image_decode.cv2 or ops.image_decode.nvjpeg

Example:

from towhee import pipe, ops, DataCollection

p = (
   pipe.input('path')
   .map('path', 'img', ops.image_decode())
   map('img', 'vec', ops.image_embedding.data2vec(model_name='facebook/data2vec-vision-base-ft1k'))
   .output('img', 'vec')
)

DataCollection(p('towhee.jpeg')).show()
mpvit: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

mpvit extracts features for images with Multi-Path Vision Transformer (MPViT) which can generate embeddings for images. MPViT embeds features of the same size~(i.e., sequence length) with patches of different scales simultaneously by using overlapping convolutional patch embedding. Tokens of different scales are then independently fed into the Transformer encoders via multiple paths and the resulting features are aggregated, enabling both fine and coarse feature representations at the same feature level.

__init__(self, model_name, num_classes: int = 1000, weights_path: str = None, device: str = None, skip_preprocess: bool = False)
model_name(str):

Pretrained model name include mpvit_tiny, mpvit_xsmall, mpvit_small or mpvit_base, all of which are pretrained on ImageNet-1K dataset, for more information, please refer the original MPViT github page.

weights_path(str):

Your local weights path, default is None, which means using the pretrained model weights.

device (str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

num_classes(int):

The number of classes. The default value is 1000.

skip_preprocess(bool):

Whether to skip image pre-process, Default value is False.

__call__(self, data: Union[List[‘towhee.types.Image’], ‘towhee.types.Image’]) -> Union[List[ndarray], ndarray]
data(‘towhee.types.Image’):

decode by ops.image_decode.cv2 or ops.image_decode.nvjpeg

Example:

from towhee import pipe, ops, DataCollection

p = (
   pipe.input('path')
      .map('path', 'img', ops.image_decode())
      .map('img', 'vec', ops.image_embedding.mpvit(model_name='mpvit_base'))
      .output('img', 'vec')
)

DataCollection(p('towhee.jpeg')).show()
swag: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

swag extracts features for image with pretrained SWAG models from Torch Hub. SWAG implements models from the paper Revisiting Weakly Supervised Pre-Training of Visual Perception Models. To achieve higher accuracy in image classification, SWAG uses hashtags to perform weakly supervised learning instead of fully supervised pretraining with image class labels.

__init__(self, model_name: str, skip_preprocess: bool = False)
model_name(str):

The model name in string. The default value is “vit_b16_in1k”. Supported model names: vit_b16_in1k, vit_l16_in1k, vit_h14_in1k, regnety_16gf_in1k, regnety_32gf_in1k, regnety_128gf_in1k

skip_preprocess(bool):

Whether to skip image pre-process, Default value is False.

__call__(self, img: ‘towhee.types.Image’) -> ndarray:
data(‘towhee.types.Image’):

decode by ops.image_decode.cv2 or ops.image_decode.nvjpeg

Example:

from towhee import pipe, ops, DataCollection

p = (
   pipe.input('path')
      .map('path', 'img', ops.image_decode())
      .map('img', 'vec', ops.image_embedding.swag(model_name='vit_b16_in1k'))
      .output('img', 'vec')
)

DataCollection(p('towhee.jpeg')).show()
class towhee.runtime.hub_ops.image_text_embedding.ImageTextEmbedding[source]

Bases: object

image_text_embedding is a task that attempts to comprehend images and texts, and encode both image’s and text’s semantics into a same embedding space. It is a fundamental task type that can be used in a variety of applications, such as cross-modal retrieval.

clip: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator extracts features for image or text with CLIP which can generate embeddings for text and image by jointly training an image encoder and text encoder to maximize the cosine similarity.

__init__(self, model_name: str, modality: str, device: str = ‘cpu’, checkpoint_path: str = None)
model_name(str):

The model name of CLIP. Available model names: clip_vit_base_patch16, clip_vit_base_patch32 clip_vit_large_patch14, clip_vit_large_patch14_336

modality(str):

Which modality(image or text) is used to generate the embedding.

device(str):

Device id: cpu/cuda:{GPUID}, default is cpu.

checkpoint_path(str):

The path to local checkpoint, defaults to None. If None, the operator will download and load pretrained model by model_name from Huggingface transformers.

__call__(self, data: List[Union[str, towhee.type.Image]]) -> Union[List[ndarray], ndarray]:
data(List[Union[str, towhee.type.Image]])

The data (image or text based on specified modality) to generate embedding.

Example:

from towhee import pipe, ops, DataCollection

img_pipe = (
    pipe.input('url')
    .map('url', 'img', ops.image_decode.cv2('rgb'))
    .map('img', 'vec', ops.image_text_embedding.clip(model_name='clip_vit_base_patch16', modality='image'))
    .output('img', 'vec')
)

text_pipe = (
    pipe.input('text')
    .map('text', 'vec', ops.image_text_embedding.clip(model_name='clip_vit_base_patch16', modality='text'))
    .output('text', 'vec')
)

DataCollection(img_pipe('./teddy.jpg')).show()
DataCollection(text_pipe('A teddybear on a skateboard in Times Square.')).show()
taiyi: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Chinese clip: taiyi extracts features for image or text, which can generate embeddings for text and image by jointly training an image encoder and text encoder to maximize the cosine similarity. This method is developed by IDEA-CCNL.

__init__(self, model_name: str, modality: str, clip_checkpoint_path: str=None, text_checkpoint_path: str=None, device: str=None)
model_name(str):

The model name of Taiyi. Available model names: taiyi-clip-roberta-102m-chinese, taiyi-clip-roberta-large-326m-chinese

modality(str):

Which modality(image or text) is used to generate the embedding.

clip_checkpoint_path(str):

The weight path to load for the clip branch.

text_checkpoint_path(str):

The weight path to load for the text branch.

device(str):

The device in string, defaults to None. If None, it will enable “cuda” automatically when cuda is available.

__call__(self, data: List[Union[str, towhee.type.Image]]) -> Union[List[ndarray], ndarray]:
data(List[Union[str, towhee.type.Image]])

The data (image or text based on specified modality) to generate embedding.

Example:

from towhee import pipe, ops, DataCollection

img_pipe = (
    pipe.input('url')
    .map('url', 'img', ops.image_decode.cv2_rgb())
    .map('img', 'vec', ops.image_text_embedding.taiyi(model_name='taiyi-clip-roberta-102m-chinese', modality='image'))
    .output('img', 'vec')
)

text_pipe = (
    pipe.input('text')
    .map('text', 'vec', ops.image_text_embedding.taiyi(model_name='taiyi-clip-roberta-102m-chinese', modality='text'))
    .output('text', 'vec')
)

DataCollection(img_pipe('./dog.jpg')).show()
DataCollection(text_pipe('一只小狗')).show()
bilp: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator extracts features for image or text with BLIP which can generate embeddings for text and image by jointly training an image encoder and text encoder to maximize the cosine similarity. This is a adaptation from salesforce/BLIP.

__init__(self, model_name: str, modality: str, device:str = ‘cpu’, checkpoint_path: str = None)
model_name(str):

The model name of CLIP. Available model names: blip_itm_base_coco, blip_itm_large_coco, blip_itm_base_flickr, blip_itm_large_flickr

modality(str):

Which modality(image or text) is used to generate the embedding.

device(str):

Device id: cpu/cuda:{GPUID}, default is cpu.

checkpoint_path(str):

The path to local checkpoint, defaults to None. If None, the operator will download and load pretrained model by model_name from Huggingface transformers.

__call__(self, data: List[Union[str, towhee.type.Image]]) -> Union[List[ndarray], ndarray]:
data(List[Union[str, towhee.type.Image]])

The data (image or text based on specified modality) to generate embedding.

Example:

from towhee import pipe, ops, DataCollection

img_pipe = (
    pipe.input('url')
    .map('url', 'img', ops.image_decode.cv2('rgb'))
    .map('img', 'vec', ops.image_text_embedding.blip(model_name='blip_itm_base_coco', modality='image'))
    .output('img', 'vec')
)

text_pipe = (
    pipe.input('text')
    .map('text', 'vec', ops.image_text_embedding.blip(model_name='blip_itm_base_coco', modality='text'))
    .output('text', 'vec')
)

DataCollection(img_pipe('./teddy.jpg')).show()
DataCollection(text_pipe('A teddybear on a skateboard in Times Square.')).show()
class towhee.runtime.hub_ops.audio_embedding.AudioEmbedding[source]

Bases: object

Audio embedding is a task that encodes audio’s semantics into a set of real vectors. It is a fundamental task type that can be used in a variety of applications, including but not limited to reverse audio search and audio deduplication.

nnfp: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

The audio embedding operator converts an input audio into a dense vector which can be used to represent the audio clip’s semantics. Each vector represents for an audio clip with a fixed length of around 1s. This operator generates audio embeddings with fingerprinting method introduced by Neural Audio Fingerprint. The model is implemented in Pytorch. We’ve also trained the nnfp model with FMA dataset (& some noise audio) and shared weights in this operator. The nnfp operator is suitable for audio fingerprinting.

__init__(self, model_name: str = ‘nnfp_default’, model_path: str = None, framework: str = ‘pytorch’, device: str = None)
model_name(str):

Model name to create nnfp model with different parameters, available model names: “nnfp_default”, “nnfp_hop25”, “nnfp_distill”.

model_path(str):

The path to model. If None, it will load default model weights.

framework(str):

Default value is pytorch. (Legacy parameter)

device(str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

__call__(self, data: List[towhee.types.AudioFrame]) -> numpy.ndarray
data(List[towhee.types.AudioFrame]):

Input audio data is a list of towhee audio frames. Create by ops.audio_decode.ffmpeg(). The audio input should be at least 1s.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'frame', ops.audio_decode.ffmpeg())
        .map('frame', 'vecs', ops.audio_embedding.nnfp(device='cpu'))
        .output('path', 'vecs')
)

DataCollection(p('test.wav')).show()
vggish: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

vggish converts an input audio into a dense vector which can be used to represent the audio clip’s semantics. Each vector represents for an audio clip with a fixed length of around 0.9s. This operator is built on top of VGGish with Pytorch. The model is a VGG variant pre-trained with a large scale of audio dataset AudioSet. As suggested, it is suitable to extract features at high level or warm up a larger model.

__init__(self, weights_path: str = None, framework: str = ‘pytorch’)
weights_path(str):

The path to model weights. If None, it will load default model weights.

framework(str):

Default value is pytorch. (Legacy parameter)

__call__(self, data: List[towhee.types.AudioFrame]) -> numpy.ndarray
data(List[towhee.types.AudioFrame]):

Input audio data is a list of towhee audio frames. Create by ops.audio_decode.ffmpeg(). The input data should represent for an audio longer than 0.9s.

Example:

from towhee import pipe, ops

p = (
    pipe.input('path')
        .map('path', 'frame', ops.audio_decode.ffmpeg())
        .map('frame', 'vecs', ops.audio_embedding.vggish())
        .output('vecs')
)

p('test.wav').get()[0]
clmr: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

clmr converts an input audio into a dense vector which can be used to represent the audio clip’s semantics. Each vector represents for an audio clip with a fixed length of around 2s. This operator is built on top of the original implementation of CLMR. The default model weight provided is pretrained on Magnatagatune Dataset with SampleCNN.

__init__(self, framework=”pytorch”):
framework(str):

Default value is pytorch. (Legacy parameter)

__call__(self, data: List[towhee.types.AudioFrame]) -> numpy.ndarray
data(List[towhee.types.AudioFrame]):

Input audio data is a list of towhee audio frames. Create by ops.audio_decode.ffmpeg(). The input data should represent for an audio longer than 3s.

Returns:

Audio embeddings in shape (num_clips, 512). Each embedding stands for features of an audio clip with length of 2.7s.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'frame', ops.audio_decode.ffmpeg())
        .map('frame', 'vecs', ops.audio_embedding.clmr())
        .output('path', 'vecs')
)

DataCollection(p('./test.wav')).show()
data2vec: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

data2vec extracts features for audio with data2vec. The core idea is to predict latent representations of the full input data based on a masked view of the input in a self-distillation setup using a standard Transformer architecture.

__init__(self, model_name = “facebook/data2vec-audio-base-960h”)
model_name(str):

Default value is “facebook/data2vec-audio-base-960h”. Available models: facebook/data2vec-audio-base-960h, facebook/data2vec-audio-large-960h, facebook/data2vec-audio-base, facebook/data2vec-audio-base-100h, facebook/data2vec-audio-base-10m, facebook/data2vec-audio-large, facebook/data2vec-audio-large-100h, facebook/data2vec-audio-large-10m

__call__(self, data: List[towhee.types.AudioFrame]) -> numpy.ndarray
data(List[towhee.types.AudioFrame]):

Input audio data is a list of towhee audio frames. Create by ops.audio_decode.ffmpeg().

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'frame', ops.audio_decode.ffmpeg(sample_rate=16000))
        .map('frame', 'vecs', ops.audio_embedding.data2vec(model_name='facebook/data2vec-audio-base-960h'))
        .output('path', 'vecs')
)

DataCollection(p('test.wav')).show()
class towhee.runtime.hub_ops.sentence_embedding.SentenceEmbedding[source]

Bases: object

Sentence embedding is the extension of word or token embedding. Instead of generating a vector for each token or word, it represents semantic information of the whole text as a vector.

transformers: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

transformers generates one embedding vector in ndarray for each input text. The embedding represents the semantic information of the whole input text as one vector. This operator is implemented with pre-trained models from Huggingface Transformers.

__init__(self, model_name: str = None, checkpoint_path: str = None, tokenizer: object = None, device: str = None):
model_name(str):

The model name in string, defaults to None. If None, the operator will be initialized without specified model. Supported model names: NLP transformers models listed in Huggingface Models. Please note that only models listed in supported_model_names are tested. You can refer to Towhee Pipeline for model performance.

checkpoint_path(str):

The path to local checkpoint, defaults to None. If None, the operator will download and load pretrained model by model_name from Huggingface transformers.

tokenizer(any):

The method to tokenize input text, defaults to None. If None, the operator will use default tokenizer by model_name from Huggingface transformers.

device (str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

__call__(self, data: Union[str, List[str]]) -> Union[ndarray, List[ndarray]]:
data(Union[str, List[str]]):

sentences

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('text')
        .map('text', 'vec',
            ops.sentence_embedding.transformers(model_name='sentence-transformers/paraphrase-albert-small-v2'))
        .output('text', 'vec')
)

DataCollection(p('Hello, world.')).show()
sbert: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

sbert takes a sentence or a list of sentences in string as input. It generates an embedding vector in numpy.ndarray for each sentence, which captures the input sentence’s core semantic elements. This operator is implemented with pre-trained models from Sentence Transformers.

__init__(self, model_name: str = None, device: str = None):
model_name(str):

The model name in string, defaults is all-MiniLM-L12-v2. Refer to SBert Doc. Please note that only models listed supported_model_names are tested. You can refer to Towhee Pipeline for model performance.

device (str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

__call__(self, data: Union[str, List[str]]) -> Union[ndarray, List[ndarray]]:
data(Union[str, List[str]]):

sentences

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('sentence')
    .map('sentence', 'vec', ops.sentence_embedding.sbert(model_name='all-MiniLM-L12-v2'))
    .output('sentence', 'vec')
)

DataCollection(p('This is a sentence.')).show()
openai: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

openai generates one embedding vector in ndarray for each input text. The embedding represents the semantic information of the whole input text as one vector. This operator is implemented with embedding models from OpenAI. Please note you need an OpenAI API key to access OpenAI.

__init__(self, model_name=’text-embedding-ada-002’, api_key=None):
model_name(str):

The model name in string, defaults to ‘text-embedding-ada-002’

api_key(str):

The OpenAI API key in string, defaults to None.

__call__(self, data: str) -> List:
data(str):

sentences

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('text')
        .map('text', 'vec',
            ops.sentence_embedding.openai(model_name='text-embedding-ada-002', api_key=OPENAI_API_KEY))
        .output('text', 'vec')
)

DataCollection(p('Hello, world.')).show()
class towhee.runtime.hub_ops.image_decode.ImageDecode[source]

Bases: object

Image decode operators convert an encoded image back to its uncompressed format. In most cases, image decoding is the first step of an image processing pipeline.

cv2: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

cv2 is an image decode operator implementation with OpenCV. Can decode images from local files/HTTP urls and image bytes.

__init__(self, mode: str = ‘BGR’):
mode(str):

BGR or RGB, default is BGR

__call__(self, image_data: Union[str, bytes]) -> ndarray:
image_data(Union[str, bytes]):

file_path/HTTP_urls or image_bytes.

Examples:

from towhee import pipe, ops, DataCollection

# decode image, in bgr channel order
p = (
  pipe.input('url')
  .map('url', 'image', ops.image_decode.cv2())
  .output('image')
)

# decode image, in rgb channel order
p2 = (
  pipe.input('url')
  .map('url', 'image', ops.image_decode.cv2('rgb'))
  .output('image')
)

# decode from path
DataCollection(p('./src_dog.jpg')).show()

# decode from bytes
with open('./src_dog.jpg', 'rb') as f:
   DataCollection(p2(f.read())).show()
nvjpeg: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

nvjpeg is an image decode operator implementation with OpenCV and nvjpeg. In CPU env, use OpenCV, in GPU env, use nvjpeg to decode jpeg files.

Can decode images from local files/HTTP urls and image binaries.

__init__(self, device: int=0)
device(int):

GPU ID, default is 0.

__call__(self, image_data: Union[str, bytes]) -> ndarray:
image_data(Union[str, bytes]):

file_path/HTTP_urls or image_bytes.

Examples:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('url')
    .map('url', 'image', ops.image_decode.nvjpeg())
    .output('image')
)

DataCollection(p('./src_dog.jpg')).show()
class towhee.runtime.hub_ops.audio_decode.AudioDecode[source]

Bases: object

Audio Decode converts the encoded audio back to uncompressed audio frames. In most cases, audio decoding is the first step of an audio processing pipeline.

ffmpeg: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Decode audio by ffmpeg lib.

__init__(self, batch_size=-1, sample_rate=None, layout=None) -> None:
batch_size(int):

if set, __call__ function return Generator[List[AudioFrame]], else return Generator[AudioFrame]

sample_rate(int):

The target sample rate.

layout(str):

The target layout, e.g. ‘stereo’, ‘mono’

__call__(self, audio_path: str) -> Generator[Union[towhee.types.AudioFrame, List[towhee.types.AudioFrame.]]]
audio_path(str):

Audio file path.

Return(Generator[Union[towhee.types.AudioFrame, List[towhee.types.AudioFrame.]]]):

An generator over audio frames with type towhee.types.AudioFrame.

Example;

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('audio_file')
    .flat_map('audio_file', 'frame', ops.audio_decode.ffmpeg())
    .output('frame')
)

DataCollection(p('./music.mp3')).show(limit=1)
class towhee.runtime.hub_ops.video_decode.VideoDecode[source]

Bases: object

Video deocde, in most cases, video decode is the first step of an video processing pipeline.

ffmpeg: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Decode videos by ffmpeg lib.

__init__(self, start_time=None, end_time=None, sample_type=None, args=None)
start_time(float):

Decode the video starting from start_time, defaulting to decoding from the beginning.

end_time(float):

Decode until end_time, defaulting to decoding to the end.

sample_type(str):

Sampling type, uniform_temporal_subsample or time_step_sample, no sampling by default。

args(dict):
if sample_type is uniform_temporal_subsample:
num_samples: int

total number of frames to sample.

if sample_type is time_step_sample:
time_step: int

time interval between samples.

__call__(self, video_path: str) -> Generator[towhee.types.VideoFrame]
video_path(str)

support local path and http/https url.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('video_file')
    .flat_map('video_file', 'frame',
              ops.video_decode.ffmpeg(start_time=10.0, end_time=15.0, sample_type='time_step_sample', args={'time_step': 1})
    )
    .output('frame')
)

DataCollection(p('./video.mp4')).show(limit=1)
VPF: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

GPU video decoder, using https://github.com/NVIDIA/VideoProcessingFramework, Users need to install the vpf package by themselves. GPC decode only support h.264, h.265 and vp9, others will use cpu decode. 4% diff with cpu-decode.

__init__(self, gpu_id, start_time=None, end_time=None, sample_type=None, args=None)
gpu_id(int):

GPU id

start_time(float):

Decode the video starting from start_time, defaulting to decoding from the beginning.

end_time(float):

Decode until end_time, defaulting to decoding to the end.

sample_type(str):

Sampling type, uniform_temporal_subsample or time_step_sample, no sampling by default。

args(dict):
if sample_type is uniform_temporal_subsample
num_samples: int

total number of frames to sample.

if sample_type is time_step_sample
time_step: int

time interval between samples.

__call__(self, video_path: str) -> Generator[towhee.types.VideoFrame]
video_path(str)

support local path and http/https url.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('video_file')
    .flat_map('video_file', 'frame',
              ops.video_decode.VPF(start_time=10.0, end_time=15.0, sample_type='time_step_sample', args={'time_step': 1})
    )
    .output('frame')
)

DataCollection(p('./video.mp4')).show(limit=1)
class towhee.runtime.hub_ops.data_source.DataSource[source]

Bases: object

data_source load data from many different sources. Work with DataLoader

glob: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

glob wrapper of python glob.glob Return a list of paths matching a pathname pattern. The pattern may contain simple shell-style wildcards a la fnmatch. However, unlike fnmatch, filenames starting with a dot are special cases that are not matched by ‘*’ and ‘?’ patterns.

If recursive is true, the pattern ‘**’ will match any files and zero or more directories and subdirectories.

__init__(self, pathname, *, recursive=False):
pathname(str):

path pattern.

recursive(bool):

Default is False

Example:

from towhee import DataLoader, pipe, ops
p = (
    pipe.input('image_path')
    .map('image_path', 'image', ops.image_decode.cv2())
    .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'))
    .output('vec')

)

for data in DataLoader(ops.data_source.glob('./*.jpg')):
    print(p(data).to_list(kv_format=True))

# batch
for data in DataLoader(ops.data_source.glob('./*.jpg'), batch_size=10):
    p.batch(data)
csv_reader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

csv_reader Wrapper of python csv: https://docs.python.org/3.8/library/csv.html .

__init__(self, f_path: str, dialect=’excel’, **fmtparams):
csvfile(str):

csvfile path

Example:

from towhee import DataLoader, pipe, ops
p = (
    pipe.input('image_path')
    .map('image_path', 'image', ops.image_decode.cv2())
    .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'))
    .output('vec')

)

# csv data format: id,image_path,label
for data in DataLoader(ops.data_source.csv_reader('./reverse_image_search.csv'), parser=lambda x: x[1]):
    print(p(data).to_list(kv_format=True))

# batch
for data in DataLoader(ops.data_source.csv_reader('./reverse_image_search.csv'), parser=lambda x: x[1], batch_size=10):
    p.batch(data)
sql: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

sql <https://towhee.io/data-source/sql> read data from sqlite or mysql.

__init__(self, sql_url: str, table_name:str, cols: str = ‘*’, where: str = None, limit: int = 500):
sql_url(str):
the url of the sql database for cache, such as ‘<db_type>+<db_driver>://:@:/’

sqlite: sqlite:///./sqlite.db mysql: mysql+pymysql://root:123456@127.0.0.1:3306/mysql

table_name(str):

table

cols(str):

The columns to be queried, default to *, indicating all columns If you want to query specific columns, use the column names and separate them with ‘,’, such as ‘id,image_path,label’.

where(‘str`):

Where conditional statement, for example: id > 100

limit(int):

The default value is 500. If set to None, all data will be returned.

Example:

from towhee import DataLoader, pipe, ops
p = (
    pipe.input('image_path')
    .map('image_path', 'image', ops.image_decode.cv2())
    .map('image', 'vec', ops.image_embedding.timm(model_name='resnet50'))
    .output('vec')

)

# table cols: id, image_path, label

for data in DataLoader(ops.data_source.sql('sqlite:///./sqlite.db', 'image_table'), parser=lambda x: x[1]):
    print(p(data).to_list(kv_format=True))

# batch
for data in DataLoader(ops.data_source.sql('sqlite:///./sqlite.db', 'image_table'), parser=lambda x: x[1], batch_size=10):
    p.batch(data)
readthedocs: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

readthedocs to get the list of documents for a single Read the Docs project.

__init__(self, page_prefix: str, index_page: str = None, include: Union[List[str], str] = ‘’, exclude: Union[List[str], str] = None):
page_prefix(str):

The root path of the page. Generally, the crawled links are relative paths. The complete URL needs to be obtained by splicing the root path + relative path.

index_page(str):

The main page contains links to all other pages, if None, will use page_prefix. example: https://towhee.readthedocs.io/en/latest/

include(Union[List[str], str]):

Only contains URLs that meet this condition.

exclude(Union[List[str], str]):

Filter out URLs that meet this condition.

Example:

from towhee import DataLoader, pipe, ops
p = (
    pipe.input('url')
    .map('url', 'text', ops.text_loader())
    .flat_map('text', 'sentence', ops.text_splitter())
    .map('sentence', 'embedding', ops.sentence_embedding.transformers(model_name='all-MiniLM-L6-v2'))
    .map('embedding', 'embedding', ops.towhee.np_normalize())
    .output('embedding')
)



for data in DataLoader(ops.data_source.readthedocs('https://towhee.readthedocs.io/en/latest/',
                                                   include='html', exclude='index.html')):
    print(p(data).to_list(kv_format=True))

# batch
for data in DataLoader(ops.data_source.readthedocs('https://towhee.readthedocs.io/en/latest/',
                                                   include='html', exclude='index.html'), batch_size=10):
    p.batch(data)
class towhee.runtime.hub_ops.ann_insert.AnnInsert[source]

Bases: object

The ANN Insert Operator is used to insert embeddings and create ANN indexes for fast similarity searches.

faiss_index: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Insert data into faiss. Only for local test. If you want to use a vector database in a production environment, you can use Milvus(https://github.com/milvus-io/milvus).

__init__(self, data_dir: str, dimension: int = None):
data_dir(str):

Path to store data.

dimension(int):

The dimension of embedding.

__call__(self, vec: ‘ndarray’, *args):
vec(ndarray):

embedding

*args(Any):

meta data.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('file_name')
    .map('file_name', 'img', ops.image_decode.cv2())
    .map('img', 'vec', ops.image_text_embedding.clip(model_name='clip_vit_base_patch32', modality='image'))
    .map('vec', 'vec', ops.towhee.np_normalize())
    .map(('vec', 'file_name'), (), ops.ann_insert.faiss_index('./faiss', 512))
    .output()
)

fs = glob('./images/*.jpg')

for f in fs:
    p(f)

# Ensure data is written to disk.
p.flush()
milvus_client: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Insert data into Milvus collections. Please make sure you have created Milvus Collection before loading the data.

__init__(self, host: str, port: int, collection_name: str, user: str = None, password: str = None):
host(str):

The host for Milvus.

port(str):

The port for Milvus.

collection_name(str):

The collection name for Milvus.

user(str)

The user for Zilliz Cloud, defaults to None.

password(str):

he password for Zilliz Cloud, defaults to None.

__call__(self, *data) -> ‘pymilvus.MutationResult’:
data(list)

The data to insert into milvus.

Example:

import towhee

from towhee import ops

p = (
        towhee.pipe.input('vec')
        .map('vec', (), ops.ann_insert.milvus_client(host='127.0.0.1', port='19530', collection_name='test_collection'))
        .output()
        )
p(vec)
milvus_multi_collections: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

milvus_multi_collections A client that can access multiple collections.

__init__(self, host: str, port: int, user: str = None, password: str = None):
host(str):

The host for Milvus.

port(str):

The port for Milvus.

user(str)

The user for Zilliz Cloud, defaults to None.

password(str):

he password for Zilliz Cloud, defaults to None.

__call__(self, collection_name: str, *data) -> ‘pymilvus.MutationResult’:
collection_name(str):

collection_name

data(list):

The data to insert into milvus.

Example:

from towhee import ops, pipe

p = (
        pipe.input('collection_name', 'vec')
        .map(('collection_name', 'vec'), (), ops.ann_insert.milvus_multi_collections(host='127.0.0.1', port='19530'))
        .output()
        )

p(vec)
class towhee.runtime.hub_ops.ann_search.AnnSearch[source]

Bases: object

The ANN search operator is used to find the closest (or most similar) point to a given point in a given set, i.e. find similar embeddings.

faiss_index: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Only for local test. If you want to use a vector database in a production environment, you can use Milvus(https://github.com/milvus-io/milvus).

__init__(self, data_dir: str, top_k: int = 5)
data_dir(str):

Path to store data.

top_k(int):

top_k similar data

__call__(self, query: ‘ndarray’) -> List[Tuple[id: int, score: float, meta: dict]
query(ndarray):

query embedding

Example;

from towhee import pipe, ops

p = (
        pipe.input('vec')
        .flat_map('vec', 'rows', ops.ann_search.faiss_index('./data_dir', 5))
        .map('rows', ('id', 'score'), lambda x: (x[0], x[1]))
        .output('id', 'score')
    )

p(<your-vector>)
milvus_client: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Search embedding in Milvus, please make sure you have inserted data to Milvus Collection.

__init__(self, host: str = ‘localhost’, port: int = 19530, collection_name: str = None,

user: str = None, password: str = None, **kwargs)

host(str):

The host for Milvus.

port(str):

The port for Milvus.

collection_name(str):

The collection name for Milvus.

user(str)

The user for Zilliz Cloud, defaults to None.

password(str):

The password for Zilliz Cloud, defaults to None.

kwargs(dict):

The same with pymilvus search: https://milvus.io/docs/search.md

__call__(self, query: ‘ndarray’) -> List[Tuple]
query(ndarray):

query embedding

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('text')
    .map('text', 'vec', ops.sentence_embedding.transformers(model_name='all-MiniLM-L12-v2'))
    .flat_map('vec', 'rows', ops.ann_search.milvus_client(host='127.0.0.1', port='19530',
                                                          collection_name='text_db2', **{'output_fields': ['text']}))
    .map('rows', ('id', 'score', 'text'), lambda x: (x[0], x[1], x[2]))
    .output('id', 'score', 'text')
)

DataCollection(p('cat')).show()
milvus_multi_collections: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

milvus_multi_collections A client that can access multiple collections.

__init__(self, host: str = ‘localhost’, port: int = 19530,

user: str = None, password: str = None, **kwargs):

host(str):

The host for Milvus.

port(str):

The port for Milvus.

user(str)

The user for Zilliz Cloud, defaults to None.

password(str):

The password for Zilliz Cloud, defaults to None.

kwargs(dict):

The same with pymilvus search: https://milvus.io/docs/search.md

__call__(self, collection_name: str, query: ‘ndarray’) -> List[Tuple]
collection_name(str):

The collection name for Milvus.

query(ndarray):

query embedding

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('text')
    .map('text', 'vec', ops.sentence_embedding.transformers(model_name='all-MiniLM-L12-v2'))
    .flat_map('vec', 'rows', ops.ann_search.milvus_multi_collections(host='127.0.0.1', port='19530', **{'output_fields': ['text']}))
    .map('rows', ('id', 'score', 'text'), lambda x: (x[0], x[1], x[2]))
    .output('id', 'score', 'text')
)

DataCollection(p('cat')).show()
class towhee.runtime.hub_ops.object_detection.ObjectDetection[source]

Bases: object

Object detection is a computer vision technique that locates and identifies people, items, or other objects in an image. Object detection has applications in many areas of computer vision, including image retrieval, image annotation, vehicle counting, object tracking, etc.

yolov5: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Object Detection is a computer vision technique that locates and identifies people, items, or other objects in an image. Object detection has applications in many areas of computer vision, including image retrieval, image annotation, vehicle counting, object tracking, etc. This operator uses PyTorch.yolov5 to detect the object.

__init__(self)

__call__(self, img: numpy.ndarray) -> Tuple[boxes, classes, scores]
img(ndarray):

Image data in ndarray format.

Returns:
boxes(List[List[(int, int, int, int)]])

A list of bounding boxes, Each bounding box is represented by the top-left and the bottom right points, i.e. (x1, y1, x2, y2).

classes(List[str]):

A list of prediction labels

scores(List[float]):

A list of confidence scores.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'img', ops.image_decode())
        .map('img', ('box', 'class', 'score'), ops.object_detection.yolov5())
        .map(('img', 'box'), 'object', ops.image_crop(clamp=True))
        .output('img', 'object', 'class')
)

DataCollection(p('./test.png')).show()
yolov8: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)
__init__(self, model_name: str)
model_name(str)

Available models: yolov8n.pt, yolov8s.pt, yolov8m.pt, yolov8l.pt

__call__(self, img: numpy.ndarray) -> Tuple[boxes, classes, scores]
img(ndarray):

Image data in ndarray format.

Returns:
boxes(List[List[(int, int, int, int)]])

A list of bounding boxes, Each bounding box is represented by the top-left and the bottom right points, i.e. (x1, y1, x2, y2).

classes(List[str]):

A list of prediction labels

scores(List[float]):

A list of confidence scores.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'img', ops.image_decode())
        .map('img', ('box', 'class', 'score'), ops.object_detection.yolov8())
        .map(('img', 'box'), 'object', ops.image_crop(clamp=True))
        .output('img', 'object', 'class')
)

DataCollection(p('./test.png')).show()
detectron2: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator uses Facebook’s Detectron2 library to compute bounding boxes, class labels, and class scores for detected objects in a given image.

__init__(self, model_name: str = ‘faster_rcnn_resnet50_c4’, thresh: int = 0.5):
model_name(str):

A string indicating which model to use. Available options:faster_rcnn_resnet50_c4 faster_rcnn_resnet50_dc5, faster_rcnn_resnet50_fpn, faster_rcnn_resnet101_c4, faster_rcnn_resnet101_dc5, faster_rcnn_resnet101_fpn, faster_rcnn_resnext101, retinanet_resnet50, retinanet_resnet101

thresh(float):

The threshold value for which an object is detected (default value: 0.5). Set this value lower to detect more objects at the expense of accuracy, or higher to reduce the total number of detections but increase the quality of detected objects.

__call__(self, image: ‘towhee.types.Image’) -> Tuple[List]
image(towhe..types.Image):

Image data wrapped in a (as a Towhee Image).

Returns:
boxes(List[List[(int, int, int, int)]])

A list of bounding boxes, Each bounding box is represented by the top-left and the bottom right points, i.e. (x1, y1, x2, y2).

classes(List[str]):

A list of prediction labels

scores(List[float]):

A list of confidence scores.

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'img', ops.image_decode())
        .map('img', ('boxes', 'classes', 'scores'), ops.object_detection.detectron2(model_name='retinanet_resnet50'))
        .output('img', 'boxes', 'classes', 'scores')
)

DataCollection(p('./example.jpg')).show()
class towhee.runtime.hub_ops.utils.Utils[source]

Bases: object

Some utils.

np_normalize: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Convert the ndarray to a unit vector.

__init__(self, axis=0)

if the axis is an integer, then the vector norm is computed for the axis of x. If the axis is a 2-tuple, the matrix norms of specified matrices are computed. If the axis is None, then either a vector norm (when x is 1-D) or a matrix norm (when x is 2-D) is returned.

__call__(self, x: ndarray)
x(ndarray)

input ndarray

Example:

import numpy as np
from towhee.dc2 import pipe, ops

p = (
    pipe.input('vec')
    .map('vec', 'vec', ops.utils.np_normalize())
    .output('vec')
    )

p(np.random.rand(20)).to_list()
image_crop: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

An image crop operator implementation with OpenCV.

__init__(self, clamp = False)
clamp(bool):

If set True, coordinates of bounding boxes would be clamped into image size.

__call__(self, img: np.ndarray, bboxes: List[Tuple]) -> List[ndarray]
img(ndarray):

The image need to be cropped.

bboxes(List[Tuple[int, int, int, int]]):

The nx4 numpy tensor for n bounding boxes need to crop, each row is formatted as (x1, y1, x2, y2).

Example:

from towhee import pipe, ops, DataCollection

p = (
    pipe.input('path')
        .map('path', 'img', ops.image_decode.cv2('rgb'))
        .map('img', ('box','score'), ops.face_detection.retinaface())
        .map(('img', 'box'), 'crop', ops.utils.image_crop(clamp = True))
        .output('img', 'crop')
)

DataCollection(p('./avengers.jpg')).show()
class towhee.runtime.hub_ops.rerank.ReRank[source]

Bases: object

Re-rank the search results based on relevance.

cross_encoder: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

The Rerank operator is used to reorder the list of relevant documents for a query. It uses the MS MARCO Cross-Encoders <https://www.sbert.net/docs/pretrained_cross-encoders.html#ms-marco> model to get the relevant scores and then reorders the documents.

__init__(self, model_name: str = ‘cross-encoder/ms-marco-MiniLM-L-6-v2’, threshold: float = 0.6, device: str = None, max_length=512, checkpoint_path=None):

model_name(str):

The model name of CrossEncoder, you can set it according to the Model List.

threshold(float)

The threshold for filtering with score

device(str):

Device id: cpu/cuda:{GPUID}, if not set, will try to find an available GPU device.

max_length(int):

Model max sequence length.

checkpoint_path(str):

You can use local model weights to initialize the operator, otherwise download the default weights from Huggingface.

__call__(self, query: str, docs: List) -> List[str], List[float]
query(str):

The query content.

docs(Union[List[str], str]):

Sentences to check the correlation with the query content.

Return(List[str], List[float])

docs and scores. The list of documents after rerank and the list of corresponding scores.

Example:

from towhee import ops, pipe, DataCollection

p = (pipe.input('query', 'doc')
        .map(('query', 'doc'), ('doc', 'score'), ops.rerank.cross_encoder(threshold=0.9))
        .flat_map(('doc', 'score'), ('doc', 'score'), lambda x, y: [(i, j) for i, j in zip(x, y)])
        .output('query', 'doc', 'score')
    )

DataCollection(p('What is Towhee?',
                ['Towhee is Towhee is a cutting-edge framework to deal with unstructure data.',
                'I do not know about towhee', 'Towhee has many powerful operators.',
                'The weather is good' ])
            ).show()
class towhee.runtime.hub_ops.llm.LLM[source]

Bases: object

The LLM ops are designed to provide a standard interface for all of them.

OpenAI: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator is implemented with Chat Completion method from OpenAI. Please note you need an OpenAI API key to access OpenAI.

__init__(self, model_name: str = ‘gpt-3.5-turbo’, api_key: str = None, **kwargs)
model_name(str):

The model name in string, defaults to ‘gpt-3.5-turbo’.

api_key(str):

The OpenAI API key in string, defaults to None.

kwargs(dict):

Other OpenAI parameters such as max_tokens, stream, temperature, etc.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]

Returns:

The next answer generated by role “assistant”.

Example:

from towhee import pipe, ops

p = (
    pipe.input('messages')
        .map('messages', 'answer', ops.LLM.OpenAI(api_key=OPENAI_API_KEY))
        .output('messages', 'answer')
)

messages=[
        {'question': 'Who won the world series in 2020?', 'answer': 'The Los Angeles Dodgers won the World Series in 2020.'},
        {'question': 'Where was it played?'}
    ]
answer = p(messages).get()[0]
Ernie: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator is implemented with Ernie Bot from Baidu. Please note you will need Ernie API key & Secret key to access the service. LLM/Ernie 利用了来自百度的文心一言。请注意,您需要文心一言服务的 APIKey和SecretKey 才能访问该服务.

__init__(self, api_key: str = None, secret_key: str = None, **kwargs)
api_key(str):

The Ernie API key in string, defaults to None. If None, it will use the environment variable ERNIE_API_KEY.

secret_key(str):

The Ernie Secret key in string, defaults to None. If None, it will use the environment variable ERNIE_SECRET_KEY.

kwargs(Dict):

Other Ernie parameters such as temperature, etc.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]

Returns:

The next answer generated by role “assistant”.

Example:

from towhee import pipe, ops

p = (
    pipe.input('messages')
        .map('messages', 'answer', ops.LLM.Ernie(api_key=ERNIE_API_KEY, secret_key=ERNIE_SECRET_KEY, temperature=0.5))
        .output('answer')
)

messages=[
        {'question': 'Zilliz Cloud 是什么?', 'answer': 'Zilliz Cloud 是一种全托管的向量检索服务。'},
        {'question': '它和 Milvus 的关系是什么?'}
    ]
answer = p(messages).get()[0]
MiniMax: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator is implemented with MinMax. Please note you will need MiniMax API Key & Group ID to access the service. LLM/MiniMax 利用了来自 MiniMax 的大语言模型服务。请注意, 您需要MiniMax 服务的 API Key 和 Group ID才能访问该服务。

__init__(self, api_key: str = None, group_id: str = None, model: str = ‘abab5-chat’, **kwargs):
api_key(str):

The MiniMax API key in string, defaults to None. If None, it will use the environment variable MINIMAX_API_KEY.

group_id(str):

The MiniMax group id in string, defaults to None. If None, it will use the environment variable MINIMAX_GROUP_ID.

model(str):

The model used in MiniMax service, defaults to ‘abab5-chat’. Visit MiniMax documentation for supported models.

kwargs(Dict):

Other MiniMax parameters such as temperature, etc.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]. It also accepts the orignal MiniMax message format like [{“sender_type”: “USER”, “text”: “a question?”}, {“sender_type”: “BOT”, “text”: “an answer.”}]

Returns:

The next answer generated by role “BOT”.

Example:

from towhee import pipe, ops

p = (
    pipe.input('messages')
        .map('messages', 'answer', ops.LLM.MiniMAX(
            api_key=MINIMAX_API_KEY,
            group_id=MINIMAX_GROUP_ID,
            temperature=0.5,
            max_tokens=50,
            role_meta={
                'user_name': '我',
                'bot_name': '工程师'
            },
            ))
        .output('answer')
)

messages=[
    {'system': '你是一个资深的软件工程师,善于回答关于科技项目的问题。'},
    {'question': 'Zilliz Cloud 是什么?', 'answer': 'Zilliz Cloud 是一种全托管的向量检索服务。'},
    {'question': '它和 Milvus 的关系是什么?'}
    ]
answer = p(messages).get()[0]
DashScope: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator is implemented with Tongyiqianqwen from DashScope at Alibaba. Please note you will need Dashscope API Key to access the service. LLM/DashScope 利用了来自 阿里云灵积模型服务的通义千问 。请注意, 您需要API KEY才能访问该服务。

__init__(self, api_key: str = None, model: str = ‘qwen-v1’, **kwargs):
api_key(str):

The DashScope API key in string, defaults to None. If None, it will use the environment variable DashScope_API_KEY.

model(str):

The model used in DashScope service, defaults to ‘qwen-v1’. Visit DashScope Documentation for supported models.

kwargs(Dict):

Other DashScope model parameters such as temperature, etc.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]. It also accepts the orignal DashScope message format like [{“user”: “a past question?”, “bot”: “a past answer”}, {“user”: “current question?”}]

Example:

from towhee import pipe, ops

p = (
    pipe.input('messages')
        .map('messages', 'answer', ops.LLM.DashScope(
            api_key=DASHSCOPE_API_KEY,
            temperature=0.5,
            ))
        .output('answer')
)

messages=[
    {'question': 'Zilliz Cloud 是什么?', 'answer': 'Zilliz Cloud 是一种全托管的向量检索服务。'},
    {'question': '它和 Milvus 的关系是什么?'}
    ]
answer = p(messages).get()[0]
SkyChat: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator is implemented with SkyChat from Singularity AI. Please note you will need the SkyChat app key & app secret to access the service. LLM/SkyChat 利用了来自 奇点智源的天工模型服务 。 请注意,您需要天工服务的 app key 和 app secret才能访问该服务。

__init__(self, app_key: str = None, app_secret: str = None, api_host: str = ‘sky-api.singularity-ai.com’, model: str = ‘sky-chat-3.5’, **kwargs):
app_key(str):

The SkyChat app key in string, defaults to None. If None, it will use the environment variable SKYCHAT_APP_KEY.

secret_key(str):

The SkyChat app secret in string, defaults to None. If None, it will use the environment variable SKYCHAT_APP_SECRET.

api_host(str):

Default is sky-api.singularity-ai.com.

model(str):

The SkyChat model name, defaults to ‘sky-chat-3.5’.

kwargs(Dict):

Other SkyChat parameters such as temperature, etc.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]. It also accepts the orignal SkyChat message format like [{“role”: “user”, “content”: “a question?”}, {“role”: “bot”, “content”: “an answer.”}]

Example:

from towhee import pipe, ops

p = (
    pipe.input('messages')
        .map('messages', 'answer', ops.LLM.SkyChat(app_key=SKYCHAT_APP_KEY, app_secret=SKYCHAT_APP_SECRET, temperature=0.5))
        .output('answer')
)

messages=[
        {'question': 'Zilliz Cloud 是什么?', 'answer': 'Zilliz Cloud 是一种全托管的向量检索服务。'},
        {'question': '它和 Milvus 的关系是什么?'}
    ]
answer = p(messages).get()[0]
ZhipuAI: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator is implemented with ChatGLM services from Zhipu AI. Please note you will need API Key to access the service. LLM/ZhipuAI 利用了来自 智谱AI开放平台的大语言模型服务 。请注意, 您需要API Key才能访问该服务。

__init__(self, model_name: str = ‘chatglm_130b’, api_key: str = None, **kwargs):
model_name(str):

The Zhipu AI API key in string, defaults to None. If None, it will use the environment variable ZHIPUAI_API_KEY.

api_key(str):

The model used in Zhipu AI service, defaults to ‘chatglm_130b’. Visit Zhipu AI documentation for supported models.

kwargs(Dict):

Other ChatGLM parameters such as temperature, etc.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]. It also accepts the orignal ChatGLM message format like [{“role”: “user”, “content”: “a question?”}, {“role”: “assistant”, “content”: “an answer.”}]

Example:

from towhee import pipe, ops

p = (
    pipe.input('messages')
        .map('messages', 'answer', ops.LLM.ZhipuAI(
            api_key=ZHIPUAI_API_KEY,
            model_name='chatglm_130b',  # or 'chatglm_6b'
            temperature=0.5,
            max_tokens=50,
            ))
        .output('answer')
)

messages=[
    {'system': '你是一个资深的软件工程师,善于回答关于科技项目的问题。'},
    {'question': 'Zilliz Cloud 是什么?', 'answer': 'Zilliz Cloud 是一种全托管的向量检索服务。'},
    {'question': '它和 Milvus 的关系是什么?'}
    ]
answer = p(messages).get()[0]
Dolly: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

This operator uses a pretrained Dolly to generate response. It will download model from HuggingFace Models.

__init__(self, model_name: str = ‘databricks/dolly-v2-12b’, **kwargs):
model_name(str):

The model name in string, defaults to ‘databricks/dolly-v2-12b’. Supported model names: databricks/dolly-v2-12b, databricks/dolly-v2-7b, databricks/dolly-v2-3b, databricks/dolly-v1-6b

kwargs(Dict):

Other Dolly model parameters such as device_map.

__call__(self, messages: List[Dict]) -> str
messages(List[Dict]):

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]

Example:

from towhee import pipe, ops

p = (
    pipe.input('question', 'docs', 'history')
        .map(('question', 'docs', 'history'), 'prompt', ops.prompt.question_answer(llm_name='dolly'))
        .map('prompt', 'answer', ops.LLM.Dolly())
        .output('answer')
)

history=[('Who won the world series in 2020?', 'The Los Angeles Dodgers won the World Series in 2020.')]
question = 'Where was it played?'
answer = p(question, [], history).get()[0]
class towhee.runtime.hub_ops.prompt.Prompt[source]

Bases: object

QA prompt.

question_answer: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)
__init__(self, temp: str = None, llm_name: str = None):
temp(str):

User-defined prompt, must contain {context} and {question}”

llm_name(str):

Pre-defined prompt, currently supports openai, ernie and dolly, openai prompt is used by default.”

__call__(self, question: str, context: str, history=Optional[List[Tuple]]) -> List[Dict[str, str]]:
question(str):

query string

context(str):

context string

history(List[Tuple]]):

history of chat, [(question1, answer1), (question2, answer2)]

Return:

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]

Example:

from towhee import ops, pipe
import requests

towhee_docs = requests.get('https://raw.githubusercontent.com/towhee-io/towhee/main/README.md').content


p = (
    pipe.input('question', 'docs', 'history')
    .map('docs', 'docs', lambda x: x[:2000])
    .map(('question', 'docs', 'history'), 'prompt', ops.prompt.question_answer())
    .map('prompt', 'answer', ops.LLM.OpenAI())
    .output('answer')
)

an1 = p('Tell me something about Towhee', towhee_docs, []).get()[0]
print(an1)

an2 = p('How to use it', towhee_docs, [('Tell me something about Towhee', an1)]).get()[0]
print(an2)
template: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Prompt Template.

__init__(self, temp: str, keys: List[str], sys_msg: str = None):
temp(str):

A template to create a prompt as the last user message.

keys(List[str]):

A list of keys used in template.

sys_msg(str):

A system message, defaults to None. If None, it will not pass any system message.

__call__(self, *args) -> List[Dict[str, str]]:
args:

Depends on the template defined by the user

Return:

A list of messages to set up chat. Must be a list of dictionaries with key value from “system”, “question”, “answer”. For example, [{“question”: “a past question?”, “answer”: “a past answer.”}, {“question”: “current question?”}]

Example:

from towhee import ops, pipe
import requests

towhee_docs = requests.get('https://raw.githubusercontent.com/towhee-io/towhee/main/README.md').content

temp = '''{question}

input:
{context}
'''
sys_message = 'Your name is TowheeChat.'

p = (
    pipe.input('question', 'doc', 'history')
    .map('doc', 'doc', lambda x: x[:2000])
    .map(('question', 'doc', 'history'), 'prompt', ops.prompt.template(temp, ['question', 'context'], sys_message))
    .map('prompt', 'answer', ops.LLM.OpenAI())
    .output('answer')
)

an1 = p('What is your name?', [], []).get()[0]
print(an1)

an2 = p('Tell me something about Towhee', towhee_docs, []).get()[0]
print(an2)

an3 = p('How to use it', towhee_docs, [('Tell me something about Towhee', an2)]).get()[0]
print(an3)
class towhee.runtime.hub_ops.data_loader.DataLoader[source]

Bases: object

The data loader Operator is used to load data from various sources.

doc_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from doc/docx files.

__init__(self)

__call__(self, path: str):
path(str):

The path to the doc/docx file.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.doc_loader())
        .output('text')
)

files = glob('./doc/*.doc*')

for file in files:
    p(file)
excel_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from excel files.

__init__(self)

__call__(self, path: str):
path(str):

The path to the excel file.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.excel_loader())
        .output('text')
)

files = glob('./excel/*.xls*')

for file in files:
    p(file)
markdown_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from markdown files.

__init__(self)

__call__(self, path: str):
path(str):

The path to the markdown file.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.markdown_loader())
        .output('text')
)

files = glob('./markdown/*.md')

for file in files:
    p(file)
pdf_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from pdf files.

__init__(self)

__call__(self, path: str, password: Optional[Union[str, bytes]] = None):
path(str):

The path to the pdf file.

password(Optional[Union[str, bytes]]):

The password for the pdf file if required.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.pdf_loader())
        .output('text')
)

files = glob('./pdf/*.pdf')

for file in files:
    p(file)
text_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from pure text files, e.g. .txt, .py, .csv, etc..

__init__(self, encoding: str = None):
encoding(str):

The file encoding to use.

__call__(self, path: str):
path(str):

The path to the text file.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.text_loader(encoding='utf-8'))
        .output('text')
)

files = glob('./text/*.txt')
file.extend(glob('./python/*.py'))

for file in files:
    p(file)
html_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from html, either from local files or url.

__init__(self, encoding: str = None):
encoding(str):

The html encoding to use.

__call__(self, path: str, encoding: str = None):
path(str):

The path to the text file.

encoding(str):

The html encoding to use.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.html_loader(encoding='utf-8'))
        .output('text')
)

files = glob('./html/*.html')

for file in files:
    p(file)
notebook_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from jupyter notebook files.

__init__(self, max_output_length: int = None):
max_output_length(str):

The max length of cell outputs.

__call__(self, path: str):
path(str):

The path to the notebook file.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.notebook_loader(max_output_length=100))
        .output('text')
)

files = glob('./notebook/*.ipynb')

for file in files:
    p(file)
powerpoint_loader: functools.partial(<class 'towhee.runtime.factory.OpsParser'>, <function _OperatorWrapper.callback at 0x7f34a8ac2f70>)

Load text from powerpoint files.

__init__(self)

__call__(self, path: str):
path(str):

The path to the powerpoint file.

Example:

from glob import glob
from towhee import ops, pipe

p = (
    pipe.input('path')
        .map('path', 'text', ops.data_loader.powerpoint_loader())
        .output('text')
)

files = glob('./powerpoint/*.pptx')

for file in files:
    p(file)