towhee.runtime.dag_repr.DAGRepr

class towhee.runtime.dag_repr.DAGRepr(nodes: Dict[str, NodeRepr], edges: Dict[int, Dict])[source]

Bases: object

A DAGRepr represents a complete DAG.

Parameters:
  • nodes (Dict[str, NodeRepr]) – All nodes in the dag, which start with _input and end with _output node.

  • edges (Dict[str, List]) –

    The edges about data queue schema, such as: { 0: {‘data’: [(a, ColumnType.SCALAR), (b, ColumnType.SCALAR)], ‘schema’: {‘a’, SchemaRepr, ‘b’, SchemaRepr}}

    1: {‘data’: [(b, ColumnType.SCALAR), (c, ColumnType.SCALAR)], ‘schema’: {‘b’, SchemaRepr, ‘c’, SchemaRepr}} 2: {‘data’: [(a, ColumnType.SCALAR), (c, ColumnType.SCALAR)], ‘schema’: {‘a’, SchemaRepr, ‘c’, SchemaRepr}}

    }

Methods

check_nodes

Check nodes if start with _input and ends with _output, and the schema has declared before using.

dfs_used_schema

Get the used schema behind the node.

from_dict

Return a DAGRepr from a dag dictionary.

get_all_inputs

Get all the inputs of the dag nodes, include ahead and current nodes.

get_base_col

get_edge_from_schema

Return the edge form the schema info for the node.

get_top_sort

Get the topological order of the DAG.

set_edges

Set in_edges and out_edges for the node, and return the nodes and edge.

Attributes

edges

nodes

__init__(nodes: Dict[str, NodeRepr], edges: Dict[int, Dict])[source]
static check_nodes(nodes: Dict[str, NodeRepr])[source]

Check nodes if start with _input and ends with _output, and the schema has declared before using.

Parameters:

nodes (Dict[str, NodeRepr]) – All the nodes from DAG.

static dfs_used_schema(nodes: Dict[str, NodeRepr], name: str, ahead_edge: Set)[source]

Get the used schema behind the node.

Parameters:
  • nodes (Dict[str, NodeRepr]) – All the nodes from DAG.

  • name (str) – Current node name.

  • ahead_edge (Set) – As set of the ahead edge schema.

Returns:

A set of the used schema behind the node.

Return type:

Set

static from_dict(dag: Dict[str, Any])[source]

Return a DAGRepr from a dag dictionary.

Parameters:

dag (str) – The dag dictionary.

Returns:

DAGRepr

static get_all_inputs(nodes: Dict[str, NodeRepr], top_sort: list)[source]

Get all the inputs of the dag nodes, include ahead and current nodes.

Parameters:
  • nodes (Dict[str, NodeRepr]) – All the nodes repr from DAG.

  • top_sort (list) – Topological list.

Returns:

Dict of the node and the all inputs of this node.

Return type:

Dict[str, Tuple]

static get_edge_from_schema(schema: Tuple, inputs: Tuple, outputs: Tuple, iter_type: str, ahead_edges: List) Dict[source]

Return the edge form the schema info for the node.

Parameters:
  • schema (`Tuple) – The out edge schema of this node.

  • inputs (Tuple) – The inputs of this node.

  • outputs (Tuple) – The outputs of this node.

  • iter_type (str) – The iteration type of this node.

  • ahead_edges (list) – A list of the ahead edges.

Returns:

A edge include data and schema.

Return type:

Dict[str, Dict]

static get_top_sort(nodes: Dict[str, NodeRepr])[source]

Get the topological order of the DAG.

Parameters:

nodes (Dict[str, NodeRepr]) – All the nodes repr from DAG.

Returns:

Topological list.

Return type:

List

static set_edges(nodes: Dict[str, NodeRepr])[source]

Set in_edges and out_edges for the node, and return the nodes and edge.

Parameters:

nodes (Dict[str, NodeRepr]) – All the nodes repr from DAG.

Returns:

The nodes update in_edges and out_edges. Dict[str, Dict]: The edges for the DAG.

Return type:

Dict[str, NodeRepr]