Browse Source

节点输入输出关系修改

liweiquan 2 years ago
parent
commit
f8a112b781
1 changed files with 28 additions and 28 deletions
  1. 28 28
      app/services/jm_job.py

+ 28 - 28
app/services/jm_job.py

@@ -122,13 +122,13 @@ def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Sessio
     file = minio_client.get_file(jm_homework.dag_url)
     result = json.loads(file)
     edges = result['edges']
-    t_s = {}
-    input_num = {}
-    for edge in edges:
-        if edge['target'] in t_s.keys():
-            t_s[edge['target']].append(edge['source'])
-        else:
-            t_s.update({edge['target']:[edge['source']]})
+    # t_s = {}
+    # input_num = {}
+    # for edge in edges:
+    #     if edge['target'] in t_s.keys():
+    #         t_s[edge['target']].append(edge['source'])
+    #     else:
+    #         t_s.update({edge['target']:[edge['source']]})
     nodes = result['nodes']
     sub_nodes = []
     for node in nodes:
@@ -164,16 +164,16 @@ def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Sessio
                 database_name = data_source.database_name
             script = '''def main_func (input0, spark,sc):
     input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
-            inputs = {}
-            index = 0
-            input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
-            for input in input_list:
-                if input in input_num.keys():
-                    input_num[input]+=1
-                else:
-                    input_num.update({input:0})
-                inputs.update({'input'+str(index):[input,input_num[input]]})
-                index+=1
+            inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
+            # index = 0
+            # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
+            # for input in input_list:
+            #     if input in input_num.keys():
+            #         input_num[input]+=1
+            #     else:
+            #         input_num.update({input:0})
+            #     inputs.update({'input'+str(index):[input,input_num[input]]})
+            #     index+=1
             sub_node = {
                 "id": node['id'],
                 "name": node['name'],
@@ -183,16 +183,16 @@ def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Sessio
             }
             sub_nodes.append(sub_node)
         else:
-            inputs = {}
-            index = 0
-            input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
-            for input in input_list:
-                if input in input_num.keys():
-                    input_num[input]+=1
-                else:
-                    input_num.update({input:0})
-                inputs.update({'input'+str(index):[input,input_num[input]]})
-                index+=1
+            inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
+            # index = 0
+            # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
+            # for input in input_list:
+            #     if input in input_num.keys():
+            #         input_num[input]+=1
+            #     else:
+            #         input_num.update({input:0})
+            #     inputs.update({'input'+str(index):[input,input_num[input]]})
+            #     index+=1
             script = node['data']['script']
             if node['op'] == 'sql':
                 script = script.replace('\n', ' ')
@@ -201,7 +201,7 @@ def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Sessio
                 "name": node['name'],
                 "op": node['op'],
                 "inputs": inputs,
-                "script": node['data']['script'],
+                "script": script,
             }
             sub_nodes.append(sub_node)
     res = {