jm_job.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. from asyncio import current_task
  2. import json
  3. import time
  4. from turtle import update
  5. from app import crud, models
  6. from app.common import minio
  7. from app.crud.jm_homework_datasource_relation import get_jm_relations
  8. from app.utils.send_util import *
  9. from sqlalchemy.orm import Session
  10. from app.common.minio import minio_client
  11. type_dict = {
  12. "Java": "java",
  13. "Python": "python",
  14. "Dag": "sparks"
  15. }
  16. def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
  17. content = ''
  18. if jm_homework.type == "Dag":
  19. content = red_dag_and_format(jm_homework, db)
  20. elif jm_homework.type == "Python":
  21. content = red_python_and_format(jm_homework)
  22. af_task = {
  23. "name": jm_homework.name,
  24. "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
  25. "script": content if jm_homework.type != "Java" else "",
  26. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  27. "cmd_parameters": "",
  28. "envs": {},
  29. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  30. "task_type": type_dict[jm_homework.type],
  31. "user_id": 0,
  32. }
  33. res = send_post('/jpt/af_task', af_task)
  34. af_task = res['data']
  35. crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
  36. return af_task
  37. def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
  38. relation = crud.get_af_id(db, jm_homework.id, 'task')
  39. content = ''
  40. if jm_homework.type == "Dag":
  41. content = content = red_dag_and_format(jm_homework, db)
  42. elif jm_homework.type == "Python":
  43. content = red_python_and_format(jm_homework)
  44. af_task = {
  45. "name": jm_homework.name,
  46. "file_urls": [] if jm_homework.type != "Java" else ['datax/'+jm_homework.script_file],
  47. "script": content if jm_homework.type != "Java" else "",
  48. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  49. "cmd_parameters": "",
  50. "envs": {},
  51. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  52. "task_type": type_dict[jm_homework.type],
  53. "user_id": 0,
  54. }
  55. res = send_put('/jpt/af_task', relation.af_id, af_task)
  56. af_task = res['data']
  57. return af_task
  58. def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
  59. task_relation = crud.get_af_id(db,jm_homework.id,'task')
  60. if task_relation is None:
  61. jm_job_create_task(jm_homework, db)
  62. else:
  63. jm_job_update_task(jm_homework, db)
  64. def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
  65. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  66. homework_ids = [node.homework_id for node in nodes]
  67. relations = crud.get_af_ids(db,homework_ids, 'task')
  68. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  69. tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  70. edges = crud.get_one_job_edges(db, jm_job_info.id)
  71. dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
  72. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  73. cron.replace('?','*')
  74. af_job = {
  75. "tasks": tasks,
  76. "name": jm_job_info.name,
  77. "dependence": dependence,
  78. "cron": cron,
  79. "desc": jm_job_info.name,
  80. "route_strategy": "",
  81. "block_strategy": "",
  82. "executor_timeout": 0,
  83. "executor_fail_retry_count": 0,
  84. "trigger_status": jm_job_info.status,
  85. "job_mode":1,
  86. "job_type": 0,
  87. "user_id": 0,
  88. }
  89. res = send_post('/jpt/af_job', af_job)
  90. af_job = res['data']
  91. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  92. send_submit(af_job['id'])
  93. # on_off_control(af_job['id'],jm_job_info.status)
  94. def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
  95. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  96. homework_ids = [node.homework_id for node in nodes]
  97. node_id_to_h_id = {node.id:node.homework_id for node in nodes}
  98. relations = crud.get_af_ids(db,homework_ids, 'task')
  99. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  100. tasks = [ send_get("/jpt/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  101. edges = crud.get_one_job_edges(db, jm_job_info.id)
  102. dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
  103. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  104. cron.replace('?','*')
  105. af_job = {
  106. "tasks": tasks,
  107. "name": jm_job_info.name,
  108. "dependence": dependence,
  109. "cron": cron,
  110. "desc": jm_job_info.name,
  111. "route_strategy": "",
  112. "block_strategy": "",
  113. "executor_timeout": 0,
  114. "executor_fail_retry_count": 0,
  115. "trigger_status": jm_job_info.status,
  116. }
  117. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  118. res = send_put('/jpt/af_job', job_relation.af_id, af_job)
  119. af_job = res['data']
  120. send_submit(af_job['id'])
  121. on_off_control(af_job['id'],jm_job_info.status)
  122. def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
  123. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  124. if job_relation is None:
  125. jm_job_create_job(jm_job_info, db)
  126. else:
  127. jm_job_update_job(jm_job_info, db)
  128. def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
  129. relations = get_jm_relations(db,jm_homework.id)
  130. node_relation_dict = { relation.node_uuid:relation for relation in relations}
  131. file = minio_client.get_file(jm_homework.dag_url)
  132. result = json.loads(file)
  133. edges = result['edges']
  134. t_s = {}
  135. input_num = {}
  136. for edge in edges:
  137. if edge['target'] in t_s.keys():
  138. t_s[edge['target']].append(edge['source'])
  139. else:
  140. t_s.update({edge['target']:[edge['source']]})
  141. nodes = result['nodes']
  142. sub_nodes = []
  143. for node in nodes:
  144. if node['op'] == 'datasource':
  145. fileds = node['data']['input_source']
  146. script = 'select '
  147. for filed in fileds:
  148. script += filed['dataField'] + ','
  149. script = script.strip(',')
  150. script += ' from ' + node_relation_dict[node['id']].table
  151. sub_node = {
  152. "id": node['id'],
  153. "name": node['name'],
  154. "op": 'sql',
  155. "script":script
  156. }
  157. sub_nodes.append(sub_node)
  158. elif node['op'] == 'outputsource':
  159. fileds = node['data']['output_source']
  160. script = '''def main_func (input0, spark,sc):
  161. input0.write.mode("overwrite").saveAsTable("'''+node_relation_dict[node['id']].table+'''")'''
  162. inputs = {}
  163. index = 0
  164. input_list = t_s[node['id']]
  165. for input in input_list:
  166. if input in input_num.keys():
  167. input_num[input]+=1
  168. else:
  169. input_num.update({input:0})
  170. inputs.update({'input'+str(index):[input,input_num[input]]})
  171. index+=1
  172. sub_node = {
  173. "id": node['id'],
  174. "name": node['name'],
  175. "op": 'pyspark',
  176. "inputs": inputs,
  177. "script":script
  178. }
  179. sub_nodes.append(sub_node)
  180. else:
  181. inputs = {}
  182. index = 0
  183. input_list = t_s[node['id']]
  184. for input in input_list:
  185. if input in input_num.keys():
  186. input_num[input]+=1
  187. else:
  188. input_num.update({input:0})
  189. inputs.update({'input'+str(index):[input,input_num[input]]})
  190. index+=1
  191. sub_node = {
  192. "id": node['id'],
  193. "name": node['name'],
  194. "op": node['op'],
  195. "inputs": inputs,
  196. "script": node['data']['script'],
  197. }
  198. sub_nodes.append(sub_node)
  199. res = {
  200. 'sub_nodes': sub_nodes,
  201. 'edges': [(edge['source'],edge['target']) for edge in edges]
  202. }
  203. return json.dumps(res)
  204. def red_python_and_format(jm_homework):
  205. file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
  206. return file.decode("utf-8")
  207. def on_off_control(af_job_id: int,status: int):
  208. for i in range(0,11):
  209. parsed_res = get_job_last_parsed_time(af_job_id)
  210. last_parsed_time = parsed_res['data']['last_parsed_time']
  211. if last_parsed_time:
  212. send_pause(af_job_id,status)
  213. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  214. break
  215. if i >= 10:
  216. raise Exception(f"{af_job_id}==>执行失败")
  217. time.sleep(2)