towhee.runtime.dag_repr.DAGRepr¶
- class towhee.runtime.dag_repr.DAGRepr(nodes: Dict[str, NodeRepr], edges: Dict[int, Dict])[source]¶
Bases:
object
A DAGRepr represents a complete DAG.
- Parameters:
nodes (Dict[str, NodeRepr]) – All nodes in the dag, which start with _input and end with _output node.
edges (Dict[str, List]) –
The edges about data queue schema, such as: { 0: {‘data’: [(a, ColumnType.SCALAR), (b, ColumnType.SCALAR)], ‘schema’: {‘a’, SchemaRepr, ‘b’, SchemaRepr}}
1: {‘data’: [(b, ColumnType.SCALAR), (c, ColumnType.SCALAR)], ‘schema’: {‘b’, SchemaRepr, ‘c’, SchemaRepr}} 2: {‘data’: [(a, ColumnType.SCALAR), (c, ColumnType.SCALAR)], ‘schema’: {‘a’, SchemaRepr, ‘c’, SchemaRepr}}
}
Methods
Check nodes if start with _input and ends with _output, and the schema has declared before using.
Get the used schema behind the node.
Return a DAGRepr from a dag dictionary.
Get all the inputs of the dag nodes, include ahead and current nodes.
get_base_col
Return the edge form the schema info for the node.
Get the topological order of the DAG.
Set in_edges and out_edges for the node, and return the nodes and edge.
Attributes
edges
nodes
- static check_nodes(nodes: Dict[str, NodeRepr])[source]¶
Check nodes if start with _input and ends with _output, and the schema has declared before using.
- Parameters:
nodes (Dict[str, NodeRepr]) – All the nodes from DAG.
- static dfs_used_schema(nodes: Dict[str, NodeRepr], name: str, ahead_edge: Set)[source]¶
Get the used schema behind the node.
- Parameters:
nodes (Dict[str, NodeRepr]) – All the nodes from DAG.
name (str) – Current node name.
ahead_edge (Set) – As set of the ahead edge schema.
- Returns:
A set of the used schema behind the node.
- Return type:
Set
- static from_dict(dag: Dict[str, Any])[source]¶
Return a DAGRepr from a dag dictionary.
- Parameters:
dag (str) – The dag dictionary.
- Returns:
DAGRepr
- static get_all_inputs(nodes: Dict[str, NodeRepr], top_sort: list)[source]¶
Get all the inputs of the dag nodes, include ahead and current nodes.
- Parameters:
nodes (Dict[str, NodeRepr]) – All the nodes repr from DAG.
top_sort (list) – Topological list.
- Returns:
Dict of the node and the all inputs of this node.
- Return type:
Dict[str, Tuple]
- static get_edge_from_schema(schema: Tuple, inputs: Tuple, outputs: Tuple, iter_type: str, ahead_edges: List) Dict [source]¶
Return the edge form the schema info for the node.
- Parameters:
schema (`Tuple) – The out edge schema of this node.
inputs (Tuple) – The inputs of this node.
outputs (Tuple) – The outputs of this node.
iter_type (str) – The iteration type of this node.
ahead_edges (list) – A list of the ahead edges.
- Returns:
A edge include data and schema.
- Return type:
Dict[str, Dict]
- static get_top_sort(nodes: Dict[str, NodeRepr])[source]¶
Get the topological order of the DAG.
- Parameters:
nodes (Dict[str, NodeRepr]) – All the nodes repr from DAG.
- Returns:
Topological list.
- Return type:
List
- static set_edges(nodes: Dict[str, NodeRepr])[source]¶
Set in_edges and out_edges for the node, and return the nodes and edge.
- Parameters:
nodes (Dict[str, NodeRepr]) – All the nodes repr from DAG.
- Returns:
The nodes update in_edges and out_edges. Dict[str, Dict]: The edges for the DAG.
- Return type:
Dict[str, NodeRepr]