jm_job.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import json
  2. from turtle import update
  3. from app import crud, models
  4. from app.crud.jm_homework_datasource_relation import get_jm_relations
  5. from app.utils.send_util import *
  6. from app.utils.utils import get_cmd_parameter
  7. from sqlalchemy.orm import Session
  8. from app.common.minio import FileHandler
  9. type_dict = {
  10. "Java": "java",
  11. "Python": "python",
  12. "Dag": "sparks"
  13. }
  14. def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
  15. content = ''
  16. if jm_homework.type == "Dag":
  17. content = red_dag_and_format(jm_homework, db)
  18. elif jm_homework.type == "Python":
  19. content = red_python_and_format(jm_homework)
  20. af_task = {
  21. "name": jm_homework.name,
  22. "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
  23. "script": content if jm_homework.type != "Java" else "",
  24. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  25. "cmd_parameters": "",
  26. "envs": {},
  27. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  28. "task_type": type_dict[jm_homework.type],
  29. "user_id": 0,
  30. }
  31. res = send_post('/jpt/af_task', af_task)
  32. af_task = res['data']
  33. crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
  34. return af_task
  35. def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
  36. relation = crud.get_af_id(db, jm_homework.id, 'task')
  37. content = ''
  38. if jm_homework.type == "Dag":
  39. content = content = red_dag_and_format(jm_homework, db)
  40. elif jm_homework.type == "Python":
  41. content = red_python_and_format(jm_homework)
  42. af_task = {
  43. "name": jm_homework.name,
  44. "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
  45. "script": content if jm_homework.type != "Java" else "",
  46. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  47. "cmd_parameters": "",
  48. "envs": {},
  49. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  50. "task_type": type_dict[jm_homework.type],
  51. "user_id": 0,
  52. }
  53. res = send_put('/jpt/af_task', relation.af_id, af_task)
  54. af_task = res['data']
  55. return af_task
  56. def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
  57. task_relation = crud.get_af_id(db,jm_homework.id,'task')
  58. if task_relation is None:
  59. jm_job_create_task(jm_homework, db)
  60. else:
  61. jm_job_update_task(jm_homework, db)
  62. def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
  63. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  64. homework_ids = [node.homework_id for node in nodes]
  65. relations = crud.get_af_ids(db,homework_ids, 'task')
  66. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  67. tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  68. edges = crud.get_one_job_edges(db, jm_job_info.id)
  69. dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
  70. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  71. cron.replace('?','*')
  72. af_job = {
  73. "tasks": tasks,
  74. "name": jm_job_info.name,
  75. "dependence": dependence,
  76. "cron": cron,
  77. "desc": jm_job_info.name,
  78. "route_strategy": "",
  79. "block_strategy": "",
  80. "executor_timeout": 0,
  81. "executor_fail_retry_count": 0,
  82. "trigger_status": jm_job_info.status,
  83. "job_mode":1,
  84. "job_type": 0,
  85. "user_id": 0,
  86. }
  87. res = send_post('/jpt/af_job', af_job)
  88. af_job = res['data']
  89. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  90. send_submit(af_job['id'])
  91. send_pause(af_job['id'], True if jm_job_info.status == 1 else False)
  92. def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
  93. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  94. homework_ids = [node.homework_id for node in nodes]
  95. node_id_to_h_id = {node.id:node.homework_id for node in nodes}
  96. relations = crud.get_af_ids(db,homework_ids, 'task')
  97. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  98. tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  99. edges = crud.get_one_job_edges(db, jm_job_info.id)
  100. dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
  101. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  102. cron.replace('?','*')
  103. af_job = {
  104. "tasks": tasks,
  105. "name": jm_job_info.name,
  106. "dependence": dependence,
  107. "cron": cron,
  108. "desc": jm_job_info.name,
  109. "route_strategy": "",
  110. "block_strategy": "",
  111. "executor_timeout": 0,
  112. "executor_fail_retry_count": 0,
  113. "trigger_status": jm_job_info.status,
  114. }
  115. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  116. res = send_put('/jpt/af_job', job_relation.af_id, af_job)
  117. af_job = res['data']
  118. send_submit(af_job['id'])
  119. send_pause(af_job['id'], True if jm_job_info.status == 1 else False)
  120. def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
  121. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  122. if job_relation is None:
  123. jm_job_create_job(jm_job_info, db)
  124. else:
  125. jm_job_update_job(jm_job_info, db)
  126. def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
  127. relations = get_jm_relations(db,jm_homework.id)
  128. node_relation_dict = { relation.node_uuid:relation for relation in relations}
  129. f = open('./dag' + jm_homework.dag_url)
  130. lines = f.read()
  131. result = json.loads(lines)
  132. f.close()
  133. edges = result['edges']
  134. t_s = {}
  135. input_num = {}
  136. for edge in edges:
  137. if edge['target'] in t_s.keys():
  138. t_s[edge['target']].append(edge['source'])
  139. else:
  140. t_s.update({edge['target']:[edge['source']]})
  141. nodes = result['nodes']
  142. sub_nodes = []
  143. for node in nodes:
  144. if node['op'] == 'datasource':
  145. fileds = node['data']['input_source']
  146. script = 'select '
  147. for filed in fileds:
  148. script += filed['dataField'] + ','
  149. script = script.strip(',')
  150. script += ' from ' + node_relation_dict[node['id']].table
  151. sub_node = {
  152. "id": node['id'],
  153. "name": node['name'],
  154. "op": 'sql',
  155. "script":script
  156. }
  157. sub_nodes.append(sub_node)
  158. elif node['op'] == 'outputsource':
  159. fileds = node['data']['output_source']
  160. script = 'select '
  161. for filed in fileds:
  162. script += filed['dataField'] + ','
  163. script = script.strip(',')
  164. script += ' from ' + node_relation_dict[node['id']].table
  165. inputs = {}
  166. index = 0
  167. input_list = t_s[node['id']]
  168. for input in input_list:
  169. if input in input_num.keys():
  170. input_num[input]+=1
  171. else:
  172. input_num.update({input:0})
  173. inputs.update({'input'+str(index):[input,input_num[input]]})
  174. index+=1
  175. sub_node = {
  176. "id": node['id'],
  177. "name": node['name'],
  178. "op": 'sql',
  179. "inputs": inputs,
  180. "script":script
  181. }
  182. sub_nodes.append(sub_node)
  183. else:
  184. inputs = {}
  185. index = 0
  186. input_list = t_s[node['id']]
  187. for input in input_list:
  188. if input in input_num.keys():
  189. input_num[input]+=1
  190. else:
  191. input_num.update({input:0})
  192. inputs.update({'input'+str(index):[input,input_num[input]]})
  193. index+=1
  194. sub_node = {
  195. "id": node['id'],
  196. "name": node['name'],
  197. "op": node['op'],
  198. "inputs": inputs,
  199. "script": node['data']['script'],
  200. }
  201. sub_nodes.append(sub_node)
  202. res = {
  203. 'sub_nodes': sub_nodes,
  204. 'edges': [(edge['source'],edge['target']) for edge in edges]
  205. }
  206. return json.dumps(res)
  207. def red_python_and_format(jm_homework):
  208. file_handler = FileHandler("datax")
  209. file = file_handler.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
  210. return file.decode("utf-8")