API
- class towhee.runtime.runtime_pipeline.RuntimePipeline(dag: Dict | DAGRepr, max_workers: int | None = None)[source]
Bases:
object
Manage the pipeline and runs it as a single instance.
- Args:
dag_dict(Dict): The DAG Dictionary from the user pipeline. max_workers(int): The maximum number of threads.
- property dag_repr
- __dict__ = mappingproxy({'__module__': 'towhee.runtime.runtime_pipeline', '__doc__': '\n Manage the pipeline and runs it as a single instance.\n\n\n Args:\n dag_dict(`Dict`): The DAG Dictionary from the user pipeline.\n max_workers(`int`): The maximum number of threads.\n ', '__init__': <function RuntimePipeline.__init__>, 'preload': <function RuntimePipeline.preload>, '__call__': <function RuntimePipeline.__call__>, 'batch': <function RuntimePipeline.batch>, 'flush': <function RuntimePipeline.flush>, '_call': <function RuntimePipeline._call>, '_batch': <function RuntimePipeline._batch>, 'dag_repr': <property object>, '_get_trace_nodes': <function RuntimePipeline._get_trace_nodes>, '_get_trace_edges': <function RuntimePipeline._get_trace_edges>, 'debug': <function RuntimePipeline.debug>, '__dict__': <attribute '__dict__' of 'RuntimePipeline' objects>, '__weakref__': <attribute '__weakref__' of 'RuntimePipeline' objects>, '__annotations__': {}})
- __module__ = 'towhee.runtime.runtime_pipeline'
- __weakref__
list of weak references to the object (if defined)
- debug(*inputs, batch: bool = False, profiler: bool = False, tracer: bool = False, include: List[str] | str | None = None, exclude: List[str] | str | None = None)[source]
Run pipeline in debug mode.
One can record the running time of each operator by setting profiler to True, or record the data of itermediate nodes by setting tracer to True. Note that one should at least specify one of profiler and tracer options to True. When debug with tracer option, one can specify which nodes to include or exclude.
- Args:
- batch (`bool):
Whether to run in batch mode.
- profiler (bool):
Whether to record the performance of the pipeline.
- tracer (bool):
Whether to record the data from intermediate nodes.
- include (Union[List[str], str]):
The nodes not to trace.
- exclude (Union[List[str], str]):
The nodes to trace.
- class tools.visualizer.Visualizer(result: DataQueue | List[Any] = None, time_profiler: List[Any] = None, data_queues: List[Dict[str, Any]] = None, nodes: Dict[str, Any] = None, trace_nodes: List[str] = None, node_collection: List[Dict[str, Any]] = None)[source]
Bases:
object
Visualize the debug information.
- __init__(result: DataQueue | List[Any] = None, time_profiler: List[Any] = None, data_queues: List[Dict[str, Any]] = None, nodes: Dict[str, Any] = None, trace_nodes: List[str] = None, node_collection: List[Dict[str, Any]] = None)[source]
- property result
- property profiler
- property tracer
- property time_profiler
- property node_collection
- property nodes
- __dict__ = mappingproxy({'__module__': 'tools.visualizer', '__doc__': '\n Visualize the debug information.\n ', '__init__': <function Visualizer.__init__>, '_get_node_queues': <function Visualizer._get_node_queues>, '_set_previous': <function Visualizer._set_previous>, '_get_collection': <staticmethod object>, 'result': <property object>, 'profiler': <property object>, 'tracer': <property object>, 'time_profiler': <property object>, 'node_collection': <property object>, 'nodes': <property object>, '_collcetion_to_dict': <function Visualizer._collcetion_to_dict>, '_dict_to_collcetion': <staticmethod object>, '_to_dict': <function Visualizer._to_dict>, 'to_json': <function Visualizer.to_json>, 'from_json': <staticmethod object>, '__dict__': <attribute '__dict__' of 'Visualizer' objects>, '__weakref__': <attribute '__weakref__' of 'Visualizer' objects>, '__annotations__': {}})
- __module__ = 'tools.visualizer'
- __weakref__
list of weak references to the object (if defined)