dag.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. from asyncio import current_task
  2. import time
  3. from app import crud, models
  4. from app.utils.send_util import *
  5. from app.utils.utils import get_cmd_parameter
  6. from sqlalchemy.orm import Session
  7. from app.common.hive import hiveDs
  8. from configs.settings import DefaultOption, config
  9. database_name = config.get('HIVE', 'DATABASE_NAME')
  10. requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
  11. requirement_prefix = config.get('REQUIREMENTS_CONFIG', 'prefix')
  12. def dag_create_job(dag_uuid: str, dag_script: str, db: Session):
  13. af_task = dag_create_task(dag_uuid, dag_script, db)
  14. af_job = {
  15. "tasks": [af_task],
  16. "name": "调试任务",
  17. "dependence": [],
  18. "cron": "None",
  19. "desc": "调试任务",
  20. "route_strategy": "",
  21. "block_strategy": "",
  22. "executor_timeout": 0,
  23. "executor_fail_retry_count": 0,
  24. "trigger_status": 1,
  25. "job_mode": 2,
  26. "job_type": 0,
  27. "user_id": 0,
  28. }
  29. res = send_post('/af/af_job', af_job)
  30. af_job = res['data']
  31. crud.create_debug_relation(db, dag_uuid, 'debug', af_job['id'])
  32. return af_job
  33. def dag_create_task(dag_uuid: str, dag_script: str, db: Session):
  34. envs = {}
  35. requirements_relation = crud.get_requirements_relation(db, dag_uuid)
  36. if requirements_relation:
  37. requirements = crud.get_requirements_status(db, dag_uuid)
  38. if requirements.status != 2:
  39. raise Exception('依赖未安装成功,不可执行')
  40. envs.update(
  41. {'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{dag_uuid.lower()}.zip'})
  42. af_task = {
  43. "name": "调试作业",
  44. "file_urls": [],
  45. "script": dag_script,
  46. "cmd": "",
  47. "cmd_parameters": "",
  48. "envs": envs,
  49. "run_image": "",
  50. "task_type": "sparks",
  51. "user_id": 0,
  52. }
  53. res = send_post('/af/af_task', af_task)
  54. af_task = res['data']
  55. return af_task
  56. def dag_update_job(dag_uuid: str, dag_script: str, db: Session):
  57. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  58. af_job_id = relation.af_id
  59. res = send_get("/af/af_job/getOnce", af_job_id)
  60. old_af_job = res['data']
  61. old_af_task = old_af_job['tasks'][0]
  62. af_task = dag_put_task(dag_uuid, dag_script, db, old_af_task)
  63. af_job = {
  64. "tasks": [af_task],
  65. "name": "调试任务",
  66. "dependence": [],
  67. "cron": "None",
  68. "desc": "调试任务",
  69. "route_strategy": "",
  70. "block_strategy": "",
  71. "executor_timeout": 0,
  72. "executor_fail_retry_count": 0,
  73. "trigger_status": 1,
  74. }
  75. res = send_put('/af/af_job', old_af_job['id'], af_job)
  76. af_job = res['data']
  77. return af_job
  78. def dag_put_task(dag_uuid: str, dag_script: str, db: Session, old_af_task):
  79. envs = {}
  80. requirements_relation = crud.get_requirements_relation(db, dag_uuid)
  81. if requirements_relation:
  82. requirements = crud.get_requirements_status(db, dag_uuid)
  83. if requirements.status != 2:
  84. raise Exception('依赖未安装成功,不可执行')
  85. envs.update(
  86. {'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{dag_uuid.lower()}.zip'})
  87. af_task = {
  88. "name": "调试作业",
  89. "file_urls": [],
  90. "script": dag_script,
  91. "cmd": "",
  92. "cmd_parameters": "",
  93. "envs": envs,
  94. "run_image": "",
  95. "task_type": "sparks",
  96. }
  97. res = send_put('/af/af_task', old_af_task['id'], af_task)
  98. af_task = res['data']
  99. return af_task
  100. def dag_job_submit(dag_uuid: str, dag_script: str, db: Session):
  101. job_relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  102. af_job = None
  103. if job_relation is None:
  104. af_job = dag_create_job(dag_uuid, dag_script, db)
  105. else:
  106. af_job = dag_update_job(dag_uuid, dag_script, db)
  107. res = get_job_last_parsed_time(af_job['id'])
  108. current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys(
  109. ) else None
  110. send_submit(af_job['id'])
  111. for i in range(0, 21):
  112. time.sleep(2)
  113. res = get_job_last_parsed_time(af_job['id'])
  114. last_parsed_time = res['data']['last_parsed_time']
  115. if last_parsed_time and current_time != last_parsed_time:
  116. send_pause(af_job['id'], 1)
  117. send_execute(af_job['id'])
  118. print(f"{af_job['id']}<==执行成功==>{last_parsed_time}")
  119. break
  120. if i >= 20:
  121. raise Exception(f"{af_job['id']}==>执行失败")
  122. return af_job
  123. def get_tmp_table_name(dag_uuid: str, node_id: str, out_pin: str, db: Session):
  124. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  125. job_id = relation.af_id
  126. table_name = f'job{job_id}_task*_subnode{node_id}_output{out_pin}_tmp'
  127. t_names = hiveDs.show_tables_like_tname(table_name)
  128. if len(t_names) > 0 and len(t_names[0]) > 0:
  129. return t_names[0][0]
  130. else:
  131. raise Exception('该节点不存在中间结果')
  132. # af_job_run = crud.get_airflow_run_once_debug_mode(db, job_id)
  133. # tasks = af_job_run.details['tasks'] if len(
  134. # af_job_run.details['tasks']) > 0 else {}
  135. # task_id = None
  136. # for task in tasks:
  137. # t_id = task.split('_')[0]
  138. # n_id = task.split('_')[1]
  139. # if n_id == node_id:
  140. # task_id = t_id
  141. # break
  142. # if task_id:
  143. # table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
  144. # t_list = hiveDs.list_tables()
  145. # table_name = table_name.lower()
  146. # if table_name not in t_list:
  147. # raise Exception('该节点不存在中间结果')
  148. # return table_name
  149. # else:
  150. # raise Exception('该节点不存在中间结果')
  151. def get_transfer_table_name(project_id: str, user_id: str, name: str, ):
  152. current_time = int(time.time())
  153. return f'{database_name}.project{project_id.lower()}_user{user_id.lower()}_{name.lower()}_{current_time}'