jm_job.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. from asyncio import current_task
  2. import json
  3. import time
  4. from turtle import update
  5. from app import crud, models, schemas
  6. from app.common import minio
  7. from app.core.datasource.datasource import DataSourceBase
  8. from app.crud.jm_homework_datasource_relation import get_jm_relations
  9. from app.utils.send_util import *
  10. from sqlalchemy.orm import Session
  11. from app.common.minio import minio_client
  12. from configs.settings import DefaultOption, config
  13. DATABASE_NAME = config.get('HIVE', 'DATABASE_NAME')
  14. type_dict = {
  15. "Java": "java",
  16. "Python": "python",
  17. "Dag": "sparks"
  18. }
  19. def jm_job_create_task(jm_homework: models.JmHomework, relation_list, db: Session):
  20. content = ''
  21. if jm_homework.type == "Dag":
  22. content = red_dag_and_format(jm_homework, relation_list, db)
  23. elif jm_homework.type == "Python":
  24. content = red_python_and_format(jm_homework)
  25. af_task = {
  26. "name": jm_homework.name,
  27. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  28. "script": content if jm_homework.type != "Java" else "",
  29. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  30. "cmd_parameters": "",
  31. "envs": {},
  32. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  33. "task_type": type_dict[jm_homework.type],
  34. "user_id": 0,
  35. }
  36. res = send_post('/af/af_task', af_task)
  37. af_task = res['data']
  38. return af_task
  39. def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Session):
  40. relation = crud.get_af_id(db, jm_homework.id, 'task')
  41. content = ''
  42. if jm_homework.type == "Dag":
  43. content = content = red_dag_and_format(jm_homework, relation_list, db)
  44. elif jm_homework.type == "Python":
  45. content = red_python_and_format(jm_homework)
  46. af_task = {
  47. "name": jm_homework.name,
  48. "file_urls": [] if jm_homework.type != "Java" else [jm_homework.script_file],
  49. "script": content if jm_homework.type != "Java" else "",
  50. "cmd": jm_homework.execute_command if jm_homework.type != "Dag" else "",
  51. "cmd_parameters": "",
  52. "envs": {},
  53. "run_image": jm_homework.image_url if jm_homework.type != "Dag" else "",
  54. "task_type": type_dict[jm_homework.type],
  55. "user_id": 0,
  56. }
  57. res = send_put('/af/af_task', relation.af_id, af_task)
  58. af_task = res['data']
  59. return af_task
  60. def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
  61. homework_ids = [node['homework_id'] for node in nodes]
  62. relations = crud.get_af_ids(db,homework_ids, 'task')
  63. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  64. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  65. dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
  66. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  67. cron.replace('?','*')
  68. af_job = {
  69. "tasks": tasks,
  70. "name": jm_job_info.name,
  71. "dependence": dependence,
  72. "cron": cron,
  73. "desc": jm_job_info.name,
  74. "route_strategy": "",
  75. "block_strategy": "",
  76. "executor_timeout": 0,
  77. "executor_fail_retry_count": 0,
  78. "trigger_status": jm_job_info.status,
  79. "job_mode":1,
  80. "job_type": 0,
  81. "user_id": 0,
  82. }
  83. res = send_post('/af/af_job', af_job)
  84. af_job = res['data']
  85. send_submit(af_job['id'])
  86. return af_job
  87. def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
  88. homework_ids = [node['homework_id'] for node in nodes]
  89. relations = crud.get_af_ids(db,homework_ids, 'task')
  90. se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
  91. tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
  92. dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
  93. cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
  94. cron.replace('?','*')
  95. af_job = {
  96. "tasks": tasks,
  97. "name": jm_job_info.name,
  98. "dependence": dependence,
  99. "cron": cron,
  100. "desc": jm_job_info.name,
  101. "route_strategy": "",
  102. "block_strategy": "",
  103. "executor_timeout": 0,
  104. "executor_fail_retry_count": 0,
  105. "trigger_status": jm_job_info.status,
  106. }
  107. job_relation = crud.get_af_id(db,jm_job_info.id,'job')
  108. res = send_put('/af/af_job', job_relation.af_id, af_job)
  109. af_job = res['data']
  110. send_submit(af_job['id'])
  111. def red_dag_and_format(jm_homework: models.JmHomework, relation_list, db: Session):
  112. node_relation_dict = { relation['node_uuid']:relation for relation in relation_list}
  113. file = minio_client.get_file(jm_homework.dag_url)
  114. result = json.loads(file)
  115. edges = result['edges']
  116. t_s = {}
  117. input_num = {}
  118. for edge in edges:
  119. if edge['target'] in t_s.keys():
  120. t_s[edge['target']].append(edge['source'])
  121. else:
  122. t_s.update({edge['target']:[edge['source']]})
  123. nodes = result['nodes']
  124. sub_nodes = []
  125. for node in nodes:
  126. if node['op'] == 'datasource':
  127. fileds = node['data']['input_source']
  128. script = 'select '
  129. for filed in fileds:
  130. script += filed['dataField'] + ','
  131. script = script.strip(',')
  132. ds_id = node_relation_dict[node['id']]['datasource_id']
  133. database_name = ""
  134. if ds_id == -1:
  135. database_name = DATABASE_NAME
  136. else:
  137. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  138. database_name = data_source.database_name
  139. script += ' from ' + database_name + '.'+node_relation_dict[node['id']]['table']
  140. sub_node = {
  141. "id": node['id'],
  142. "name": node['name'],
  143. "op": 'sql',
  144. "script":script
  145. }
  146. sub_nodes.append(sub_node)
  147. elif node['op'] == 'outputsource':
  148. fileds = node['data']['output_source']
  149. ds_id = node_relation_dict[node['id']]['datasource_id']
  150. database_name = ""
  151. if ds_id == -1:
  152. database_name = DATABASE_NAME
  153. else:
  154. data_source = crud.get_job_jdbc_datasource(db,ds_id)
  155. database_name = data_source.database_name
  156. script = '''def main_func (input0, spark,sc):
  157. input0.write.mode("overwrite").saveAsTable("''' + database_name + '.'+node_relation_dict[node['id']]['table']+'''")'''
  158. inputs = {}
  159. index = 0
  160. input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  161. for input in input_list:
  162. if input in input_num.keys():
  163. input_num[input]+=1
  164. else:
  165. input_num.update({input:0})
  166. inputs.update({'input'+str(index):[input,input_num[input]]})
  167. index+=1
  168. sub_node = {
  169. "id": node['id'],
  170. "name": node['name'],
  171. "op": 'pyspark',
  172. "inputs": inputs,
  173. "script":script
  174. }
  175. sub_nodes.append(sub_node)
  176. else:
  177. inputs = {}
  178. index = 0
  179. input_list = t_s[node['id']] if node['id'] in t_s.keys() else []
  180. for input in input_list:
  181. if input in input_num.keys():
  182. input_num[input]+=1
  183. else:
  184. input_num.update({input:0})
  185. inputs.update({'input'+str(index):[input,input_num[input]]})
  186. index+=1
  187. sub_node = {
  188. "id": node['id'],
  189. "name": node['name'],
  190. "op": node['op'],
  191. "inputs": inputs,
  192. "script": node['data']['script'],
  193. }
  194. sub_nodes.append(sub_node)
  195. res = {
  196. 'sub_nodes': sub_nodes,
  197. 'edges': [(edge['source'],edge['target']) for edge in edges]
  198. }
  199. return json.dumps(res)
  200. def red_python_and_format(jm_homework):
  201. file = minio_client.get_file(jm_homework.script_file if jm_homework.script_file else "/python/test.py")
  202. return file.decode("utf-8")
  203. def on_off_control(af_job_id: int,status: int):
  204. for i in range(0,11):
  205. parsed_res = get_job_last_parsed_time(af_job_id)
  206. last_parsed_time = parsed_res['data']['last_parsed_time']
  207. if last_parsed_time:
  208. send_pause(af_job_id,status)
  209. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  210. break
  211. if i >= 10:
  212. raise Exception(f"{af_job_id}==>执行失败")
  213. time.sleep(2)
  214. def execute_job(af_job_id: int):
  215. current_time = int(time.time())
  216. send_submit(af_job_id)
  217. for i in range(0,21):
  218. parsed_res = get_job_last_parsed_time(af_job_id)
  219. last_parsed_time = parsed_res['data']['last_parsed_time']
  220. if last_parsed_time and int(last_parsed_time) > current_time:
  221. res = send_execute(af_job_id)
  222. print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
  223. return res
  224. if i >= 20:
  225. raise Exception(f"{af_job_id}==>文件正在转化中")
  226. time.sleep(2)