|
@@ -1,8 +1,60 @@
|
|
|
-export const DagToData = (Dag: any) => {
|
|
|
- console.log('Dag:', Dag);
|
|
|
- const nodes: any = []
|
|
|
+export const DagToData = (graph: any, dagId: any) => {
|
|
|
+ const dagData = graph.toJSON()
|
|
|
const edges: any = []
|
|
|
- Dag.cells.forEach((item: any) => {
|
|
|
+ const nodes: any = []
|
|
|
+ dagData.cells.forEach((item: any) => {
|
|
|
+ if (item?.shape === 'dag-edge') {
|
|
|
+ const sourceNodeId = graph.getCellById(item.source.cell).data.nodeId
|
|
|
+ const targetNodeId = graph.getCellById(item.target.cell).data.nodeId
|
|
|
+ edges.push([sourceNodeId, targetNodeId])
|
|
|
+ } else {
|
|
|
+ switch (item.data?.type) {
|
|
|
+ case "datasource":
|
|
|
+ nodes.push({
|
|
|
+ id: item.data.nodeId,
|
|
|
+ name: item.data.nodeName,
|
|
|
+ type: 'datasource',
|
|
|
+ op: "sql",
|
|
|
+ script: datasourceToSql(item.data),
|
|
|
+ dataTableId: item.data.dataTableId,
|
|
|
+ fields: datasourceFields(item.data)
|
|
|
+ })
|
|
|
+ break;
|
|
|
+ case "outputsource":
|
|
|
+ /* nodes.push({
|
|
|
+ id: item.id,
|
|
|
+ name: item.data.nodeName,
|
|
|
+ op: "datasource",
|
|
|
+ data: {
|
|
|
+ output_source: item.data.outputSource,
|
|
|
+ }
|
|
|
+ }) */
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ nodes.push({
|
|
|
+ id: item.data.nodeId,
|
|
|
+ type: 'script',
|
|
|
+ name: item.data.nodeName,
|
|
|
+ op: item.data.label,
|
|
|
+ inputs: generateInputs(graph, item.id),
|
|
|
+ script: item.data.scriptText
|
|
|
+ })
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ return {
|
|
|
+ dag_id: dagId,
|
|
|
+ user_name: "XXX",
|
|
|
+ user_id: 1,
|
|
|
+ nodes_task_name: "dfs",
|
|
|
+ nodes_task_id: 123,
|
|
|
+ itermidate_data: ["hdfs://host:port/uri"],
|
|
|
+ nodes,
|
|
|
+ edges,
|
|
|
+ graph: dagData
|
|
|
+ }
|
|
|
+ /* dagData.cells.forEach((item: any) => {
|
|
|
if (item?.shape === 'dag-edge') {
|
|
|
edges.push({
|
|
|
id: item.id,
|
|
@@ -48,19 +100,40 @@ export const DagToData = (Dag: any) => {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- });
|
|
|
- return {
|
|
|
- user_name: "XXX",
|
|
|
- user_id: 1,
|
|
|
- nodes_task_name: "dfs",
|
|
|
- nodes_task_id: 123,
|
|
|
- itermidate_data: ["hdfs://host:port/uri"],
|
|
|
- nodes,
|
|
|
- edges,
|
|
|
- graph: Dag
|
|
|
+ }); */
|
|
|
+}
|
|
|
+
|
|
|
+const datasourceToSql = (nodeData: any) => {
|
|
|
+ const { inputSource, dataTableName } = nodeData
|
|
|
+ if (inputSource?.length > 0 && dataTableName) {
|
|
|
+ const params = inputSource.reduce((pre: any, item: any) => {
|
|
|
+ return item.dataSelect ? pre.concat(item.dataField) : pre
|
|
|
+ }, []).join(', ')
|
|
|
+ return params ? `select ${params} from ${dataTableName}` : ''
|
|
|
+ }
|
|
|
+ return ''
|
|
|
+}
|
|
|
+
|
|
|
+const datasourceFields = (nodeData: any) => {
|
|
|
+ const { inputSource, dataTableName } = nodeData
|
|
|
+ if (inputSource?.length > 0 && dataTableName) {
|
|
|
+ return inputSource.filter((item: any) => item.dataSelect)
|
|
|
}
|
|
|
+ return []
|
|
|
}
|
|
|
|
|
|
-export const DataToDag = (Data: any) => {
|
|
|
- console.log('Data:', Data);
|
|
|
+const generateInputs = (graph: any, id: any) => {
|
|
|
+ const inputsResult: any = {}
|
|
|
+ const edges = graph.getIncomingEdges(id)
|
|
|
+ if (edges?.length > 0) {
|
|
|
+ edges.forEach((item: any, index: any) => {
|
|
|
+ const sourceNode = item.getSourceNode()
|
|
|
+ const sourcePortID = item.getSourcePortId()
|
|
|
+ const sourcePorts = sourceNode.getPortsByGroup('bottom')
|
|
|
+ const sourceId = sourceNode.data.nodeId
|
|
|
+ const portNumber = sourceNode.data?.type === 'datasource' ? 0 : sourcePorts.indexOf(sourcePorts.find((item: any) => item.id === sourcePortID))
|
|
|
+ inputsResult[`input${index}`] = [sourceId, portNumber]
|
|
|
+ })
|
|
|
+ }
|
|
|
+ return inputsResult
|
|
|
}
|