jm_job.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import json
  2. from turtle import update
  3. from app import crud, models
  4. from app.crud.jm_homework_datasource_relation import get_jm_relations
  5. from app.utils.send_util import *
  6. from app.utils.utils import get_cmd_parameter
  7. from sqlalchemy.orm import Session
  8. from app.common.minio import FileHandler
  9. type_dict = {
  10. "Java": "java",
  11. "Python": "python",
  12. "Dag": "sparks"
  13. }
  14. def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
  15. content = ''
  16. if jm_homework.type == "Dag":
  17. content = red_dag_and_format(jm_homework, db)
  18. elif jm_homework.type == "Python":
  19. content = red_python_and_format(jm_homework)
  20. af_task = {
  21. "name": jm_homework.name,
  22. "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
  23. "script": content if jm_homework.type != "Java" else "",
  24. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  25. "cmd_parameters": "",
  26. "envs": {},
  27. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  28. "task_type": type_dict[jm_homework.type],
  29. "user_id": 0,
  30. }
  31. res = send_post('/jpt/af_task', af_task)
  32. af_task = res['data']
  33. crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
  34. return af_task
  35. def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
  36. relation = crud.get_af_id(db, jm_homework.id, 'task')
  37. content = ''
  38. if jm_homework.type == "Dag":
  39. content = content = red_dag_and_format(jm_homework, db)
  40. elif jm_homework.type == "Python":
  41. content = red_python_and_format(jm_homework)
  42. af_task = {
  43. "name": jm_homework.name,
  44. "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
  45. "script": content if jm_homework.type != "Java" else "",
  46. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  47. "cmd_parameters": "",
  48. "envs": {},
  49. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  50. "task_type": type_dict[jm_homework.type],
  51. "user_id": 0,
  52. }
  53. res = send_put('/jpt/af_task', relation.af_id, af_task)
  54. af_task = res['data']
  55. return af_task
  56. def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
  57. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  58. homework_ids = [node.homework_id for node in nodes]
  59. relations = crud.get_af_ids(db,homework_ids, 'task')
  60. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  61. tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  62. edges = crud.get_one_job_edges(db, jm_job_info.id)
  63. dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
  64. af_job = {
  65. "tasks": tasks,
  66. "name": jm_job_info.name,
  67. "dependence": dependence,
  68. "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
  69. "desc": jm_job_info.name,
  70. "route_strategy": "",
  71. "block_strategy": "",
  72. "executor_timeout": 0,
  73. "executor_fail_retry_count": 0,
  74. "trigger_status": jm_job_info.status,
  75. "job_mode":1,
  76. "job_type": 0,
  77. "user_id": 0,
  78. }
  79. res = send_post('/jpt/af_job', af_job)
  80. af_job = res['data']
  81. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  82. send_submit(af_job['id'])
  83. def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
  84. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  85. homework_ids = [node.homework_id for node in nodes]
  86. relations = crud.get_af_ids(db,homework_ids, 'task')
  87. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  88. tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  89. edges = crud.get_one_job_edges(db, jm_job_info.id)
  90. dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
  91. af_job = {
  92. "tasks": tasks,
  93. "name": jm_job_info.name,
  94. "dependence": dependence,
  95. "cron": jm_job_info.cron_expression if jm_job_info.cron_type == 2 else 'onec',
  96. "desc": jm_job_info.name,
  97. "route_strategy": "",
  98. "block_strategy": "",
  99. "executor_timeout": 0,
  100. "executor_fail_retry_count": 0,
  101. "trigger_status": jm_job_info.status,
  102. }
  103. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  104. res = send_put('/jpt/af_job', job_relation.af_id, af_job)
  105. af_job = res['data']
  106. send_submit(af_job['id'])
  107. def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
  108. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  109. if job_relation is None:
  110. jm_job_create_job(jm_job_info, db)
  111. else:
  112. jm_job_update_job(jm_job_info, db)
  113. def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
  114. relations = get_jm_relations(db,jm_homework.id)
  115. node_relation_dict = { relation.node_uuid:relation for relation in relations}
  116. f = open('./dag' + jm_homework.dag_url)
  117. lines = f.read()
  118. result = json.loads(lines)
  119. f.close()
  120. edges = result['edges']
  121. t_s = {}
  122. input_num = {}
  123. for edge in edges:
  124. if edge['target'] in t_s.keys():
  125. t_s[edge['target']].append(edge['source'])
  126. else:
  127. t_s.update({edge['target']:[edge['source']]})
  128. nodes = result['nodes']
  129. sub_nodes = []
  130. for node in nodes:
  131. if node['op'] == 'datasource':
  132. fileds = node['data']['input_source']
  133. script = 'select '
  134. for filed in fileds:
  135. script += filed['dataField'] + ','
  136. script = script.strip(',')
  137. script += ' from ' + node_relation_dict[node['id']].table
  138. sub_node = {
  139. "id": node['id'],
  140. "name": node['name'],
  141. "op": 'sql',
  142. "script":script
  143. }
  144. sub_nodes.append(sub_node)
  145. elif node['op'] == 'outputsource':
  146. fileds = node['data']['output_source']
  147. script = 'select '
  148. for filed in fileds:
  149. script += filed['dataField'] + ','
  150. script = script.strip(',')
  151. script += ' from ' + node_relation_dict[node['id']].table
  152. inputs = {}
  153. index = 0
  154. input_list = t_s[node['id']]
  155. for input in input_list:
  156. if input in input_num.keys():
  157. input_num[input]+=1
  158. else:
  159. input_num.update({input:0})
  160. inputs.update({'input'+str(index):[input,input_num[input]]})
  161. index+=1
  162. sub_node = {
  163. "id": node['id'],
  164. "name": node['name'],
  165. "op": 'sql',
  166. "inputs": inputs,
  167. "script":script
  168. }
  169. sub_nodes.append(sub_node)
  170. else:
  171. inputs = {}
  172. index = 0
  173. input_list = t_s[node['id']]
  174. for input in input_list:
  175. if input in input_num.keys():
  176. input_num[input]+=1
  177. else:
  178. input_num.update({input:0})
  179. inputs.update({'input'+str(index):[input,input_num[input]]})
  180. index+=1
  181. sub_node = {
  182. "id": node['id'],
  183. "name": node['name'],
  184. "op": node['op'],
  185. "inputs": inputs,
  186. "script": node['data']['script'],
  187. }
  188. sub_nodes.append(sub_node)
  189. res = {
  190. 'sub_nodes': sub_nodes,
  191. 'edges': [(edge['source'],edge['target']) for edge in edges]
  192. }
  193. return json.dumps(res)
  194. def red_python_and_format(jm_homework):
  195. file_handler = FileHandler("datax")
  196. file = file_handler.get_file("/python/test.py")
  197. return file.decode("utf-8")