API

class towhee.runtime.runtime_pipeline.RuntimePipeline(dag: Dict | DAGRepr, max_workers: int | None = None)[source]

Bases: object

Manage the pipeline and runs it as a single instance.

Args:

dag_dict(Dict): The DAG Dictionary from the user pipeline. max_workers(int): The maximum number of threads.

__init__(dag: Dict | DAGRepr, max_workers: int | None = None)[source]
preload()[source]

Preload the operators.

__call__(*inputs)[source]

Output with ordering matching the input DataQueue.

batch(batch_inputs)[source]
flush()[source]

Call the flush interface of ops.

property dag_repr
__dict__ = mappingproxy({'__module__': 'towhee.runtime.runtime_pipeline', '__doc__': '\n    Manage the pipeline and runs it as a single instance.\n\n\n    Args:\n        dag_dict(`Dict`): The DAG Dictionary from the user pipeline.\n        max_workers(`int`): The maximum number of threads.\n    ', '__init__': <function RuntimePipeline.__init__>, 'preload': <function RuntimePipeline.preload>, '__call__': <function RuntimePipeline.__call__>, 'batch': <function RuntimePipeline.batch>, 'flush': <function RuntimePipeline.flush>, '_call': <function RuntimePipeline._call>, '_batch': <function RuntimePipeline._batch>, 'dag_repr': <property object>, '_get_trace_nodes': <function RuntimePipeline._get_trace_nodes>, '_get_trace_edges': <function RuntimePipeline._get_trace_edges>, 'debug': <function RuntimePipeline.debug>, '__dict__': <attribute '__dict__' of 'RuntimePipeline' objects>, '__weakref__': <attribute '__weakref__' of 'RuntimePipeline' objects>, '__annotations__': {}})
__module__ = 'towhee.runtime.runtime_pipeline'
__weakref__

list of weak references to the object (if defined)

debug(*inputs, batch: bool = False, profiler: bool = False, tracer: bool = False, include: List[str] | str | None = None, exclude: List[str] | str | None = None)[source]

Run pipeline in debug mode.

One can record the running time of each operator by setting profiler to True, or record the data of itermediate nodes by setting tracer to True. Note that one should at least specify one of profiler and tracer options to True. When debug with tracer option, one can specify which nodes to include or exclude.

Args:
batch (`bool):

Whether to run in batch mode.

profiler (bool):

Whether to record the performance of the pipeline.

tracer (bool):

Whether to record the data from intermediate nodes.

include (Union[List[str], str]):

The nodes not to trace.

exclude (Union[List[str], str]):

The nodes to trace.

class tools.visualizer.Visualizer(result: DataQueue | List[Any] = None, time_profiler: List[Any] = None, data_queues: List[Dict[str, Any]] = None, nodes: Dict[str, Any] = None, trace_nodes: List[str] = None, node_collection: List[Dict[str, Any]] = None)[source]

Bases: object

Visualize the debug information.

__init__(result: DataQueue | List[Any] = None, time_profiler: List[Any] = None, data_queues: List[Dict[str, Any]] = None, nodes: Dict[str, Any] = None, trace_nodes: List[str] = None, node_collection: List[Dict[str, Any]] = None)[source]
property result
property profiler
property tracer
property time_profiler
property node_collection
property nodes
to_json(**kws)[source]
static from_json(info)[source]
__dict__ = mappingproxy({'__module__': 'tools.visualizer', '__doc__': '\n    Visualize the debug information.\n    ', '__init__': <function Visualizer.__init__>, '_get_node_queues': <function Visualizer._get_node_queues>, '_set_previous': <function Visualizer._set_previous>, '_get_collection': <staticmethod object>, 'result': <property object>, 'profiler': <property object>, 'tracer': <property object>, 'time_profiler': <property object>, 'node_collection': <property object>, 'nodes': <property object>, '_collcetion_to_dict': <function Visualizer._collcetion_to_dict>, '_dict_to_collcetion': <staticmethod object>, '_to_dict': <function Visualizer._to_dict>, 'to_json': <function Visualizer.to_json>, 'from_json': <staticmethod object>, '__dict__': <attribute '__dict__' of 'Visualizer' objects>, '__weakref__': <attribute '__weakref__' of 'Visualizer' objects>, '__annotations__': {}})
__module__ = 'tools.visualizer'
__weakref__

list of weak references to the object (if defined)