jm_job.py 11 KB


  1. from asyncio import current_task
  2. import json
  3. import time
  4. from turtle import update
  5. from app import crud, models, schemas
  6. from app.common import minio
  7. from app.core.datasource.datasource import DataSourceBase
  8. from app.crud.jm_homework_datasource_relation import get_jm_relations
  9. from app.utils.send_util import *
  10. from sqlalchemy.orm import Session
  11. from app.common.minio import minio_client
  12. from configs.settings import DefaultOption, config
  13. DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
  14. requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
  15. requirement_prefix = config.get('REQUIREMENTS_CONFIG', 'prefix')
  16. type_dict = {
  17. "Java": "java",
  18. "Python": "python",
  19. "Dag": "sparks"
  20. }
  21. def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
  22. content = ''
  23. envs = {}
  24. if jm_homework.type == "Dag":
  25. content = red_dag_and_format(jm_homework, relation_list, db)
  26. requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
  27. if requirements_relation:
  28. envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{jm_homework.dag_uuid.lower()}.zip'})
  29. elif jm_homework.type == "Python":
  30. content = red_python_and_format(jm_homework)
  31. af_task = {
  32. "name": jm_homework.name,
  33. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  34. "script": content if jm_homework.type != "Java" else "",
  35. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  36. "cmd_parameters": "",
  37. "envs": envs,
  38. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  39. "task_type": type_dict[jm_homework.type],
  40. "user_id": 0,
  41. }
  42. res = send_post('/af/af_task', af_task)
  43. af_task = res['data']
  44. return af_task
  45. def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
  46. relation = crud.get_af_id(db, jm_homework.id, 'task')
  47. content = ''
  48. envs = {}
  49. if jm_homework.type == "Dag":
  50. content = red_dag_and_format(jm_homework, relation_list, db)
  51. requirements_relation = crud.get_requirements_relation(db, jm_homework.dag_uuid)
  52. if requirements_relation:
  53. envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{jm_homework.dag_uuid.lower()}.zip'})
  54. elif jm_homework.type == "Python":
  55. content = red_python_and_format(jm_homework)
  56. af_task = {
  57. "name": jm_homework.name,
  58. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  59. "script": content if jm_homework.type != "Java" else "",
  60. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  61. "cmd_parameters": "",
  62. "envs": envs,
  63. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  64. "task_type": type_dict[jm_homework.type],
  65. "user_id": 0,
  66. }
  67. res = send_put('/af/af_task', relation.af_id, af_task)
  68. af_task = res['data']
  69. return af_task
  70. def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
  71. homework_ids = [node['homework_id'] for node in nodes]
  72. node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
  73. relations = crud.get_af_ids(db,homework_ids, 'task')
  74. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  75. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  76. dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
  77. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  78. cron = cron.replace('?','*')
  79. af_job = {
  80. "tasks": tasks,
  81. "name": jm_job_info.name,
  82. "dependence": dependence,
  83. "cron": cron,
  84. "desc": jm_job_info.name,
  85. "route_strategy": "",
  86. "block_strategy": "",
  87. "executor_timeout": 0,
  88. "executor_fail_retry_count": 0,
  89. "trigger_status": jm_job_info.status,
  90. "job_mode":1,
  91. "job_type": 0,
  92. "user_id": 0,
  93. }
  94. res = send_post('/af/af_job', af_job)
  95. af_job = res['data']
  96. send_submit(af_job['id'])
  97. return af_job
  98. def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
  99. homework_ids = [node['homework_id'] for node in nodes]
  100. node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
  101. relations = crud.get_af_ids(db,homework_ids, 'task')
  102. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  103. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  104. dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
  105. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  106. cron = cron.replace('?','*')
  107. af_job = {
  108. "tasks": tasks,
  109. "name": jm_job_info.name,
  110. "dependence": dependence,
  111. "cron": cron,
  112. "desc": jm_job_info.name,
  113. "route_strategy": "",
  114. "block_strategy": "",
  115. "executor_timeout": 0,
  116. "executor_fail_retry_count": 0,
  117. "trigger_status": jm_job_info.status,
  118. }
  119. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  120. res = send_put('/af/af_job', job_relation.af_id, af_job)
  121. af_job = res['data']
  122. send_submit(af_job['id'])
  123. def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session):
  124. node_relation_dict = { relation['node_uuid']:relation for relation in relation_list}
  125. file = minio_client.get_file(jm_homework.dag_url)
  126. result = json.loads(file)
  127. edges = result['edges']
  128. # t_s = {}
  129. # input_num = {}
  130. # for edge in edges:
  131. # if edge['target'] in t_s.keys():
  132. # t_s[edge['target']].append(edge['source'])
  133. # else:
  134. # t_s.update({edge['target']:[edge['source']]})
  135. nodes = result['nodes']
  136. sub_nodes = []
  137. for node in nodes:
  138. if node['op'] == 'datasource':
  139. fileds = node['data']['input_source']
  140. script = 'select '
  141. for filed in fileds:
  142. script += filed['dataField'] + ','
  143. script = script.strip(',')
  144. ds_id = node_relation_dict[node['id']]['datasource_id']
  145. database_name = ""
  146. if ds_id == -1:
  147. database_name = DATABASE_NAME
  148. else:
  149. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  150. database_name = data_source.database_name
  151. script += ' from `' + database_name + '`.`'+node_relation_dict[node['id']]['table'] + '`'
  152. sub_node = {
  153. "id": node['id'],
  154. "name": node['name'],
  155. "op": 'sql',
  156. "script":script
  157. }
  158. sub_nodes.append(sub_node)
  159. elif node['op'] == 'outputsource':
  160. fileds = node['data']['output_source']
  161. ds_id = node_relation_dict[node['id']]['datasource_id']
  162. database_name = ""
  163. if ds_id == -1:
  164. database_name = DATABASE_NAME
  165. else:
  166. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  167. database_name = data_source.database_name
  168. script = '''def main_func (input0, spark,sc):
  169. input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
  170. inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
  171. # index = 0
  172. # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  173. # for input in input_list:
  174. # if input in input_num.keys():
  175. # input_num[input]+=1
  176. # else:
  177. # input_num.update({input:0})
  178. # inputs.update({'input'+str(index):[input,input_num[input]]})
  179. # index+=1
  180. sub_node = {
  181. "id": node['id'],
  182. "name": node['name'],
  183. "op": 'pyspark',
  184. "inputs": inputs,
  185. "script":script
  186. }
  187. sub_nodes.append(sub_node)
  188. else:
  189. inputs = node['data']['inputs'] if 'inputs' in node['data'].keys() else {}
  190. # index = 0
  191. # input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  192. # for input in input_list:
  193. # if input in input_num.keys():
  194. # input_num[input]+=1
  195. # else:
  196. # input_num.update({input:0})
  197. # inputs.update({'input'+str(index):[input,input_num[input]]})
  198. # index+=1
  199. script = node['data']['script']
  200. if node['op'] == 'sql':
  201. script = script.replace('\n', ' ')
  202. sub_node = {
  203. "id": node['id'],
  204. "name": node['name'],
  205. "op": node['op'],
  206. "inputs": inputs,
  207. "script": script,
  208. }
  209. sub_nodes.append(sub_node)
  210. res = {
  211. 'sub_nodes': sub_nodes,
  212. 'edges': [(edge['source'],edge['target']) for edge in edges]
  213. }
  214. return json.dumps(res)
  215. def red_python_and_format(jm_homework):
  216. file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
  217. return file.decode("utf-8")
  218. def on_off_control(af_job_id: int,status: int):
  219. for i in range(0,11):
  220. parsed_res = get_job_last_parsed_time(af_job_id)
  221. last_parsed_time = parsed_res['data']['last_parsed_time']
  222. if last_parsed_time:
  223. send_pause(af_job_id,status)
  224. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  225. break
  226. if i >= 10:
  227. raise Exception(f"{af_job_id}==>状态修改失败")
  228. time.sleep(2)
  229. def execute_job(af_job_id: int):
  230. res = get_job_last_parsed_time(af_job_id)
  231. current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
  232. send_submit(af_job_id)
  233. for i in range(0,21):
  234. parsed_res = get_job_last_parsed_time(af_job_id)
  235. last_parsed_time = parsed_res['data']['last_parsed_time']
  236. if last_parsed_time and last_parsed_time != current_time:
  237. res = send_execute(af_job_id)
  238. print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
  239. return res
  240. if i >= 20:
  241. raise Exception(f"{af_job_id}==>文件正在转化中")
  242. time.sleep(2)