jm_job.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. from asyncio import current_task
  2. import json
  3. import time
  4. from turtle import update
  5. from app import crud, models, schemas
  6. from app.common import minio
  7. from app.core.datasource.datasource import DataSourceBase
  8. from app.crud.jm_homework_datasource_relation import get_jm_relations
  9. from app.utils.send_util import *
  10. from sqlalchemy.orm import Session
  11. from app.common.minio import minio_client
  12. from configs.settings import DefaultOption, config
  13. DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
  14. type_dict = {
  15. "Java": "java",
  16. "Python": "python",
  17. "Dag": "sparks"
  18. }
  19. def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
  20. content = ''
  21. envs = {}
  22. if jm_homework.type == "Dag":
  23. content = content = red_dag_and_format(jm_homework, relation_list, db)
  24. requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
  25. if requirements_relation:
  26. envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})
  27. elif jm_homework.type == "Python":
  28. content = red_python_and_format(jm_homework)
  29. af_task = {
  30. "name": jm_homework.name,
  31. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  32. "script": content if jm_homework.type != "Java" else "",
  33. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  34. "cmd_parameters": "",
  35. "envs": envs,
  36. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  37. "task_type": type_dict[jm_homework.type],
  38. "user_id": 0,
  39. }
  40. res = send_post('/af/af_task', af_task)
  41. af_task = res['data']
  42. return af_task
  43. def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
  44. relation = crud.get_af_id(db, jm_homework.id, 'task')
  45. content = ''
  46. envs = {}
  47. if jm_homework.type == "Dag":
  48. content = content = red_dag_and_format(jm_homework, relation_list, db)
  49. requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
  50. if requirements_relation:
  51. envs.update({'requirement_package_path': f'jpt/requirements/dag_{jm_homework.dag_uuid.lower()})'})
  52. elif jm_homework.type == "Python":
  53. content = red_python_and_format(jm_homework)
  54. af_task = {
  55. "name": jm_homework.name,
  56. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  57. "script": content if jm_homework.type != "Java" else "",
  58. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  59. "cmd_parameters": "",
  60. "envs": envs,
  61. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  62. "task_type": type_dict[jm_homework.type],
  63. "user_id": 0,
  64. }
  65. res = send_put('/af/af_task', relation.af_id, af_task)
  66. af_task = res['data']
  67. return af_task
  68. def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
  69. homework_ids = [node['homework_id'] for node in nodes]
  70. node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
  71. relations = crud.get_af_ids(db,homework_ids, 'task')
  72. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  73. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  74. dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
  75. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  76. cron = cron.replace('?','*')
  77. af_job = {
  78. "tasks": tasks,
  79. "name": jm_job_info.name,
  80. "dependence": dependence,
  81. "cron": cron,
  82. "desc": jm_job_info.name,
  83. "route_strategy": "",
  84. "block_strategy": "",
  85. "executor_timeout": 0,
  86. "executor_fail_retry_count": 0,
  87. "trigger_status": jm_job_info.status,
  88. "job_mode":1,
  89. "job_type": 0,
  90. "user_id": 0,
  91. }
  92. res = send_post('/af/af_job', af_job)
  93. af_job = res['data']
  94. send_submit(af_job['id'])
  95. return af_job
  96. def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
  97. homework_ids = [node['homework_id'] for node in nodes]
  98. node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
  99. relations = crud.get_af_ids(db,homework_ids, 'task')
  100. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  101. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  102. dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
  103. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  104. cron = cron.replace('?','*')
  105. af_job = {
  106. "tasks": tasks,
  107. "name": jm_job_info.name,
  108. "dependence": dependence,
  109. "cron": cron,
  110. "desc": jm_job_info.name,
  111. "route_strategy": "",
  112. "block_strategy": "",
  113. "executor_timeout": 0,
  114. "executor_fail_retry_count": 0,
  115. "trigger_status": jm_job_info.status,
  116. }
  117. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  118. res = send_put('/af/af_job', job_relation.af_id, af_job)
  119. af_job = res['data']
  120. send_submit(af_job['id'])
  121. def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session):
  122. node_relation_dict = { relation['node_uuid']:relation for relation in relation_list}
  123. file = minio_client.get_file(jm_homework.dag_url)
  124. result = json.loads(file)
  125. edges = result['edges']
  126. # t_s = {}
  127. # input_num = {}
  128. # for edge in edges:
  129. # if edge['target'] in t_s.keys():
  130. # t_s[edge['target']].append(edge['source'])
  131. # else:
  132. # t_s.update({edge['target']:[edge['source']]})
  133. nodes = result['nodes']
  134. sub_nodes = []
  135. for node in nodes:
  136. if node['op'] == 'datasource':
  137. fileds = node['data']['input_source']
  138. script = 'select '
  139. for filed in fileds:
  140. script += filed['dataField'] + ','
  141. script = script.strip(',')
  142. ds_id = node_relation_dict[node['id']]['datasource_id']
  143. database_name = ""
  144. if ds_id == -1:
  145. database_name = DATABASE_NAME
  146. else:
  147. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  148. database_name = data_source.database_name
  149. script += ' from ' + database_name + '.'+node_relation_dict[node['id']]['table']
  150. sub_node = {
  151. "id": node['id'],
  152. "name": node['name'],
  153. "op": 'sql',
  154. "script":script
  155. }
  156. sub_nodes.append(sub_node)
  157. elif node['op'] == 'outputsource':
  158. fileds = node['data']['output_source']
  159. ds_id = node_relation_dict[node['id']]['datasource_id']
  160. database_name = ""
  161. if ds_id == -1:
  162. database_name = DATABASE_NAME
  163. else:
  164. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  165. database_name = data_source.database_name
  166. script = '''def main_func (input0, spark,sc):
  167. input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
  168. inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
  169. # index = 0
  170. # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  171. # for input in input_list:
  172. # if input in input_num.keys():
  173. # input_num[input]+=1
  174. # else:
  175. # input_num.update({input:0})
  176. # inputs.update({'input'+str(index):[input,input_num[input]]})
  177. # index+=1
  178. sub_node = {
  179. "id": node['id'],
  180. "name": node['name'],
  181. "op": 'pyspark',
  182. "inputs": inputs,
  183. "script":script
  184. }
  185. sub_nodes.append(sub_node)
  186. else:
  187. inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
  188. # index = 0
  189. # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  190. # for input in input_list:
  191. # if input in input_num.keys():
  192. # input_num[input]+=1
  193. # else:
  194. # input_num.update({input:0})
  195. # inputs.update({'input'+str(index):[input,input_num[input]]})
  196. # index+=1
  197. script = node['data']['script']
  198. if node['op'] == 'sql':
  199. script = script.replace('\n', ' ')
  200. sub_node = {
  201. "id": node['id'],
  202. "name": node['name'],
  203. "op": node['op'],
  204. "inputs": inputs,
  205. "script": script,
  206. }
  207. sub_nodes.append(sub_node)
  208. res = {
  209. 'sub_nodes': sub_nodes,
  210. 'edges': [(edge['source'],edge['target']) for edge in edges]
  211. }
  212. return json.dumps(res)
  213. def red_python_and_format(jm_homework):
  214. file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
  215. return file.decode("utf-8")
  216. def on_off_control(af_job_id: int,status: int):
  217. for i in range(0,11):
  218. parsed_res = get_job_last_parsed_time(af_job_id)
  219. last_parsed_time = parsed_res['data']['last_parsed_time']
  220. if last_parsed_time:
  221. send_pause(af_job_id,status)
  222. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  223. break
  224. if i >= 10:
  225. raise Exception(f"{af_job_id}==>状态修改失败")
  226. time.sleep(2)
  227. def execute_job(af_job_id: int):
  228. res = get_job_last_parsed_time(af_job_id)
  229. current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
  230. send_submit(af_job_id)
  231. for i in range(0,21):
  232. parsed_res = get_job_last_parsed_time(af_job_id)
  233. last_parsed_time = parsed_res['data']['last_parsed_time']
  234. if last_parsed_time and last_parsed_time != current_time:
  235. res = send_execute(af_job_id)
  236. print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
  237. return res
  238. if i >= 20:
  239. raise Exception(f"{af_job_id}==>文件正在转化中")
  240. time.sleep(2)