jm_job.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. from asyncio import current_task
  2. import json
  3. import time
  4. from turtle import update
  5. from app import crud, models
  6. from app.common import minio
  7. from app.core.datasource.datasource import DataSourceBase
  8. from app.crud.jm_homework_datasource_relation import get_jm_relations
  9. from app.utils.send_util import *
  10. from sqlalchemy.orm import Session
  11. from app.common.minio import minio_client
  12. from configs.settings import DefaultOption, config
  13. DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
  14. type_dict = {
  15. "Java": "java",
  16. "Python": "python",
  17. "Dag": "sparks"
  18. }
  19. def jm_job_create_task(jm_homework: models.JmHomework, db: Session):
  20. content = ''
  21. if jm_homework.type == "Dag":
  22. content = red_dag_and_format(jm_homework, db)
  23. elif jm_homework.type == "Python":
  24. content = red_python_and_format(jm_homework)
  25. af_task = {
  26. "name": jm_homework.name,
  27. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  28. "script": content if jm_homework.type != "Java" else "",
  29. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  30. "cmd_parameters": "",
  31. "envs": {},
  32. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  33. "task_type": type_dict[jm_homework.type],
  34. "user_id": 0,
  35. }
  36. res = send_post('/af/af_task', af_task)
  37. af_task = res['data']
  38. crud.create_relation(db ,jm_homework.id, 'task', af_task['id'])
  39. return af_task
  40. def jm_job_update_task(jm_homework: models.JmHomework, db: Session):
  41. relation = crud.get_af_id(db, jm_homework.id, 'task')
  42. content = ''
  43. if jm_homework.type == "Dag":
  44. content = content = red_dag_and_format(jm_homework, db)
  45. elif jm_homework.type == "Python":
  46. content = red_python_and_format(jm_homework)
  47. af_task = {
  48. "name": jm_homework.name,
  49. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  50. "script": content if jm_homework.type != "Java" else "",
  51. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  52. "cmd_parameters": "",
  53. "envs": {},
  54. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  55. "task_type": type_dict[jm_homework.type],
  56. "user_id": 0,
  57. }
  58. res = send_put('/af/af_task', relation.af_id, af_task)
  59. af_task = res['data']
  60. return af_task
  61. def jm_homework_submit(jm_homework: models.JmHomework, db: Session):
  62. task_relation = crud.get_af_id(db,jm_homework.id,'task')
  63. if task_relation is None:
  64. jm_job_create_task(jm_homework, db)
  65. else:
  66. jm_job_update_task(jm_homework, db)
  67. def jm_job_create_job(jm_job_info: models.JmJobInfo, db: Session):
  68. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  69. homework_ids = [node.homework_id for node in nodes]
  70. relations = crud.get_af_ids(db,homework_ids, 'task')
  71. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  72. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  73. edges = crud.get_one_job_edges(db, jm_job_info.id)
  74. dependence = [[se_id_to_af_id_dict[edge['in_node_id']],se_id_to_af_id_dict[str(edge['out_node_id'])]] for edge in edges]
  75. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  76. cron.replace('?','*')
  77. af_job = {
  78. "tasks": tasks,
  79. "name": jm_job_info.name,
  80. "dependence": dependence,
  81. "cron": cron,
  82. "desc": jm_job_info.name,
  83. "route_strategy": "",
  84. "block_strategy": "",
  85. "executor_timeout": 0,
  86. "executor_fail_retry_count": 0,
  87. "trigger_status": jm_job_info.status,
  88. "job_mode":1,
  89. "job_type": 0,
  90. "user_id": 0,
  91. }
  92. res = send_post('/af/af_job', af_job)
  93. af_job = res['data']
  94. crud.create_relation(db, jm_job_info.id,'job', af_job['id'])
  95. send_submit(af_job['id'])
  96. # on_off_control(af_job['id'],jm_job_info.status)
  97. def jm_job_update_job(jm_job_info: models.JmJobInfo, db: Session):
  98. nodes = crud.get_one_job_nodes(db, jm_job_info.id)
  99. homework_ids = [node.homework_id for node in nodes]
  100. node_id_to_h_id = {node.id:node.homework_id for node in nodes}
  101. relations = crud.get_af_ids(db,homework_ids, 'task')
  102. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  103. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  104. edges = crud.get_one_job_edges(db, jm_job_info.id)
  105. dependence = [[se_id_to_af_id_dict[node_id_to_h_id[edge.in_node_id]],se_id_to_af_id_dict[node_id_to_h_id[edge.out_node_id]]] for edge in edges]
  106. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  107. cron.replace('?','*')
  108. af_job = {
  109. "tasks": tasks,
  110. "name": jm_job_info.name,
  111. "dependence": dependence,
  112. "cron": cron,
  113. "desc": jm_job_info.name,
  114. "route_strategy": "",
  115. "block_strategy": "",
  116. "executor_timeout": 0,
  117. "executor_fail_retry_count": 0,
  118. "trigger_status": jm_job_info.status,
  119. }
  120. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  121. res = send_put('/af/af_job', job_relation.af_id, af_job)
  122. af_job = res['data']
  123. send_submit(af_job['id'])
  124. on_off_control(af_job['id'],jm_job_info.status)
  125. def jm_job_submit(jm_job_info: models.JmJobInfo, db: Session):
  126. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  127. if job_relation is None:
  128. jm_job_create_job(jm_job_info, db)
  129. else:
  130. jm_job_update_job(jm_job_info, db)
  131. def red_dag_and_format(jm_homework: models.JmHomework, db: Session):
  132. relations = get_jm_relations(db,jm_homework.id)
  133. node_relation_dict = { relation.node_uuid:relation for relation in relations}
  134. file = minio_client.get_file(jm_homework.dag_url)
  135. result = json.loads(file)
  136. edges = result['edges']
  137. t_s = {}
  138. input_num = {}
  139. for edge in edges:
  140. if edge['target'] in t_s.keys():
  141. t_s[edge['target']].append(edge['source'])
  142. else:
  143. t_s.update({edge['target']:[edge['source']]})
  144. nodes = result['nodes']
  145. sub_nodes = []
  146. for node in nodes:
  147. if node['op'] == 'datasource':
  148. fileds = node['data']['input_source']
  149. script = 'select '
  150. for filed in fileds:
  151. script += filed['dataField'] + ','
  152. script = script.strip(',')
  153. ds_id = node_relation_dict[node['id']].datasource_id
  154. database_name = ""
  155. if ds_id == -1:
  156. database_name = DATABASE_NAME
  157. else:
  158. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  159. database_name = data_source.database_name
  160. script += ' from ' + database_name + '.'+node_relation_dict[node['id']].table
  161. sub_node = {
  162. "id": node['id'],
  163. "name": node['name'],
  164. "op": 'sql',
  165. "script":script
  166. }
  167. sub_nodes.append(sub_node)
  168. elif node['op'] == 'outputsource':
  169. fileds = node['data']['output_source']
  170. ds_id = node_relation_dict[node['id']].datasource_id
  171. database_name = ""
  172. if ds_id == -1:
  173. database_name = DATABASE_NAME
  174. else:
  175. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  176. database_name = data_source.database_name
  177. script = '''def main_func (input0, spark,sc):
  178. input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']].table+'''")'''
  179. inputs = {}
  180. index = 0
  181. input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  182. for input in input_list:
  183. if input in input_num.keys():
  184. input_num[input]+=1
  185. else:
  186. input_num.update({input:0})
  187. inputs.update({'input'+str(index):[input,input_num[input]]})
  188. index+=1
  189. sub_node = {
  190. "id": node['id'],
  191. "name": node['name'],
  192. "op": 'pyspark',
  193. "inputs": inputs,
  194. "script":script
  195. }
  196. sub_nodes.append(sub_node)
  197. else:
  198. inputs = {}
  199. index = 0
  200. input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  201. for input in input_list:
  202. if input in input_num.keys():
  203. input_num[input]+=1
  204. else:
  205. input_num.update({input:0})
  206. inputs.update({'input'+str(index):[input,input_num[input]]})
  207. index+=1
  208. sub_node = {
  209. "id": node['id'],
  210. "name": node['name'],
  211. "op": node['op'],
  212. "inputs": inputs,
  213. "script": node['data']['script'],
  214. }
  215. sub_nodes.append(sub_node)
  216. res = {
  217. 'sub_nodes': sub_nodes,
  218. 'edges': [(edge['source'],edge['target']) for edge in edges]
  219. }
  220. return json.dumps(res)
  221. def red_python_and_format(jm_homework):
  222. file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
  223. return file.decode("utf-8")
  224. def on_off_control(af_job_id: int,status: int):
  225. for i in range(0,11):
  226. parsed_res = get_job_last_parsed_time(af_job_id)
  227. last_parsed_time = parsed_res['data']['last_parsed_time']
  228. if last_parsed_time:
  229. send_pause(af_job_id,status)
  230. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  231. break
  232. if i >= 10:
  233. raise Exception(f"{af_job_id}==>执行失败")
  234. time.sleep(2)