|
@@ -1,4 +1,74 @@
|
|
|
+// 数据转化
|
|
|
export const DagToData = (graph: any, dagId: any) => {
|
|
|
+ const dagData = graph.toJSON()
|
|
|
+ const {edges, nodes} = getEdgesAndNodes(graph)
|
|
|
+ const {script_nodes, script_edges} = getScriptEdgesAndNodes(graph)
|
|
|
+ return {
|
|
|
+ dag_id: dagId,
|
|
|
+ user_name: "XXX",
|
|
|
+ user_id: 1,
|
|
|
+ nodes_task_name: "dfs",
|
|
|
+ nodes_task_id: 123,
|
|
|
+ itermidate_data: ["hdfs://host:port/uri"],
|
|
|
+ nodes,
|
|
|
+ edges,
|
|
|
+ dag_script: {
|
|
|
+ sub_nodes: script_nodes,
|
|
|
+ edges: script_edges,
|
|
|
+ },
|
|
|
+ graph: dagData
|
|
|
+ }
|
|
|
+ /* dagData.cells.forEach((item: any) => {
|
|
|
+ if (item?.shape === 'dag-edge') {
|
|
|
+ edges.push({
|
|
|
+ id: item.id,
|
|
|
+ source: item.source.cell,
|
|
|
+ target: item.target.cell
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ switch (item.data?.type) {
|
|
|
+ case "datasource":
|
|
|
+ nodes.push({
|
|
|
+ id: item.id,
|
|
|
+ name: item.data.nodeName,
|
|
|
+ op: "datasource",
|
|
|
+ data: {
|
|
|
+ input_source: item.data.inputSource,
|
|
|
+ input_table: item.data.dataTable,
|
|
|
+ }
|
|
|
+ })
|
|
|
+ break;
|
|
|
+ case "outputsource":
|
|
|
+ nodes.push({
|
|
|
+ id: item.id,
|
|
|
+ name: item.data.nodeName,
|
|
|
+ op: "datasource",
|
|
|
+ data: {
|
|
|
+ output_source: item.data.outputSource,
|
|
|
+ }
|
|
|
+ })
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ nodes.push({
|
|
|
+ id: item.id,
|
|
|
+ name: item.data.nodeName,
|
|
|
+ op: item.data.label,
|
|
|
+ data: {
|
|
|
+ input_number: item.data.inputNumber,
|
|
|
+ output: item.data.outputData,
|
|
|
+ script: item.data.scriptText,
|
|
|
+ param: item.data.paramText,
|
|
|
+ package: item.data.packageData
|
|
|
+ }
|
|
|
+ })
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }); */
|
|
|
+}
|
|
|
+
|
|
|
+// 获取所有Edge和Nodes
|
|
|
+const getEdgesAndNodes = (graph: any) => {
|
|
|
const dagData = graph.toJSON()
|
|
|
const edges: any = []
|
|
|
const nodes: any = []
|
|
@@ -50,6 +120,12 @@ export const DagToData = (graph: any, dagId: any) => {
|
|
|
}
|
|
|
}
|
|
|
});
|
|
|
+ return {edges, nodes}
|
|
|
+}
|
|
|
+
|
|
|
+// 获取所有脚本Edge和Nodes
|
|
|
+const getScriptEdgesAndNodes = (graph: any) => {
|
|
|
+ const dagData = graph.toJSON()
|
|
|
const script_nodes: any = []
|
|
|
const script_edges: any = []
|
|
|
dagData.cells.forEach((item: any) => {
|
|
@@ -91,69 +167,7 @@ export const DagToData = (graph: any, dagId: any) => {
|
|
|
}
|
|
|
}
|
|
|
})
|
|
|
- return {
|
|
|
- dag_id: dagId,
|
|
|
- user_name: "XXX",
|
|
|
- user_id: 1,
|
|
|
- nodes_task_name: "dfs",
|
|
|
- nodes_task_id: 123,
|
|
|
- itermidate_data: ["hdfs://host:port/uri"],
|
|
|
- nodes,
|
|
|
- edges,
|
|
|
- dag_script: {
|
|
|
- sub_nodes: script_nodes,
|
|
|
- edges: script_edges,
|
|
|
- },
|
|
|
- graph: dagData
|
|
|
- }
|
|
|
- /* dagData.cells.forEach((item: any) => {
|
|
|
- if (item?.shape === 'dag-edge') {
|
|
|
- edges.push({
|
|
|
- id: item.id,
|
|
|
- source: item.source.cell,
|
|
|
- target: item.target.cell
|
|
|
- })
|
|
|
- } else {
|
|
|
- switch (item.data?.type) {
|
|
|
- case "datasource":
|
|
|
- nodes.push({
|
|
|
- id: item.id,
|
|
|
- name: item.data.nodeName,
|
|
|
- op: "datasource",
|
|
|
- data: {
|
|
|
- input_source: item.data.inputSource,
|
|
|
- input_table: item.data.dataTable,
|
|
|
- }
|
|
|
- })
|
|
|
- break;
|
|
|
- case "outputsource":
|
|
|
- nodes.push({
|
|
|
- id: item.id,
|
|
|
- name: item.data.nodeName,
|
|
|
- op: "datasource",
|
|
|
- data: {
|
|
|
- output_source: item.data.outputSource,
|
|
|
- }
|
|
|
- })
|
|
|
- break;
|
|
|
- default:
|
|
|
- nodes.push({
|
|
|
- id: item.id,
|
|
|
- name: item.data.nodeName,
|
|
|
- op: item.data.label,
|
|
|
- data: {
|
|
|
- input_number: item.data.inputNumber,
|
|
|
- output: item.data.outputData,
|
|
|
- script: item.data.scriptText,
|
|
|
- param: item.data.paramText,
|
|
|
- package: item.data.packageData
|
|
|
- }
|
|
|
- })
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }); */
|
|
|
-
|
|
|
+ return {script_nodes, script_edges}
|
|
|
}
|
|
|
|
|
|
const datasourceToSql = (nodeData: any) => {
|
|
@@ -189,4 +203,73 @@ const generateInputs = (graph: any, id: any) => {
|
|
|
})
|
|
|
}
|
|
|
return inputsResult
|
|
|
+}
|
|
|
+
|
|
|
+// 标记跳过节点
|
|
|
+export const signNodesSkip = (graph: any, node: any, action: any) => {
|
|
|
+ const skipNodes = getSkipNodes(graph, node, action) as any
|
|
|
+ const {script_nodes, script_edges} = getScriptEdgesAndNodes(graph)
|
|
|
+ script_nodes.forEach((item: any) => {
|
|
|
+ item.skip = skipNodes[item.id]
|
|
|
+ })
|
|
|
+ return {script_nodes, script_edges}
|
|
|
+}
|
|
|
+
|
|
|
+// 获取跳过节点
|
|
|
+const getSkipNodes = (graph: any, node: any, action: any) => {
|
|
|
+ // 结果 {nodeId, skip}
|
|
|
+ const result = {} as any
|
|
|
+ // 设置默认true
|
|
|
+ const { nodes } = getEdgesAndNodes(graph)
|
|
|
+ // 获取标记了前置后置的节点
|
|
|
+ const preEndNodes = getNodesSourceAndTarget(graph)
|
|
|
+ nodes.forEach((item: any) => {
|
|
|
+ result[item.id] = true
|
|
|
+ })
|
|
|
+ switch (action) {
|
|
|
+ // 执行到此处
|
|
|
+ case 'handle_run':
|
|
|
+ result[node.data.nodeId] = false
|
|
|
+ break;
|
|
|
+ // 执行到此处
|
|
|
+ case 'handle_run_end':
|
|
|
+ traverseNodes(result, node.data.nodeId, preEndNodes, 'sourceNodes')
|
|
|
+ break
|
|
|
+ // 从此处开始执行
|
|
|
+ case 'handle_run_begin':
|
|
|
+ traverseNodes(result, node.data.nodeId, preEndNodes, 'targetNodes')
|
|
|
+ result[node.data.nodeId] = true
|
|
|
+ break
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ return result
|
|
|
+}
|
|
|
+
|
|
|
+// 遍历前后继
|
|
|
+const traverseNodes = (result: any, nodeId: any, preEndNodes: any, nodes: any) => {
|
|
|
+ result[nodeId] = false
|
|
|
+ const sourceNodes = preEndNodes[nodeId][nodes]
|
|
|
+ sourceNodes.forEach((item: any) => {
|
|
|
+ traverseNodes(result, item, preEndNodes, nodes)
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+// 获取前置后置
|
|
|
+export const getNodesSourceAndTarget = (graph: any) => {
|
|
|
+ const { nodes, edges } = getEdgesAndNodes(graph)
|
|
|
+ const resultNodes = {} as any
|
|
|
+ nodes.forEach((item: any) => {
|
|
|
+ resultNodes[item.id] = {} as any
|
|
|
+ resultNodes[item.id]['sourceNodes'] = []
|
|
|
+ resultNodes[item.id]['targetNodes'] = []
|
|
|
+ })
|
|
|
+ edges.forEach((item: any) => {
|
|
|
+ const sourceNodeId = item.source
|
|
|
+ const targetNodeId = item.target
|
|
|
+ resultNodes[sourceNodeId]['targetNodes'].push(targetNodeId)
|
|
|
+ resultNodes[targetNodeId]['sourceNodes'].push(sourceNodeId)
|
|
|
+ })
|
|
|
+ return resultNodes
|
|
|
}
|