dag.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from asyncio import current_task
  2. import time
  3. from app import crud, models
  4. from app.utils.send_util import *
  5. from app.utils.utils import get_cmd_parameter
  6. from sqlalchemy.orm import Session
  7. from app.common.hive import hiveDs
  8. from configs.settings import DefaultOption, config
  9. database_name = config.get('HIVE', 'DATABASE_NAME')
  10. requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
  11. def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
  12. af_task = dag_create_task(dag_uuid, dag_script, db)
  13. af_job = {
  14. "tasks": [af_task],
  15. "name": "调试任务",
  16. "dependence": [],
  17. "cron": "None",
  18. "desc": "调试任务",
  19. "route_strategy": "",
  20. "block_strategy": "",
  21. "executor_timeout": 0,
  22. "executor_fail_retry_count": 0,
  23. "trigger_status": 1,
  24. "job_mode":2,
  25. "job_type": 0,
  26. "user_id": 0,
  27. }
  28. res = send_post('/af/af_job', af_job)
  29. af_job = res['data']
  30. crud.create_debug_relation(db,dag_uuid,'debug',af_job['id'])
  31. return af_job
  32. def dag_create_task(dag_uuid:str,dag_script: str,db: Session):
  33. envs = {}
  34. requirements_relation = crud.get_requirements_relation(db, dag_uuid)
  35. if requirements_relation:
  36. requirements = crud.get_requirements_status(db, dag_uuid)
  37. if requirements.status != 2:
  38. raise Exception('依赖未安装成功,不可执行')
  39. envs.update({'requirement_package_path': f'{requirement_path}dag_{dag_uuid.lower()})'})
  40. af_task = {
  41. "name": "调试作业",
  42. "file_urls": [],
  43. "script": dag_script,
  44. "cmd": "",
  45. "cmd_parameters": "",
  46. "envs": envs,
  47. "run_image": "",
  48. "task_type": "sparks",
  49. "user_id": 0,
  50. }
  51. res = send_post('/af/af_task', af_task)
  52. af_task = res['data']
  53. return af_task
  54. def dag_update_job(dag_uuid:str,dag_script: str, db: Session):
  55. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  56. af_job_id = relation.af_id
  57. res = send_get("/af/af_job/getOnce",af_job_id)
  58. old_af_job = res['data']
  59. old_af_task = old_af_job['tasks'][0]
  60. af_task = dag_put_task(dag_script,old_af_task)
  61. af_job = {
  62. "tasks": [af_task],
  63. "name": "调试任务",
  64. "dependence": [],
  65. "cron": "None",
  66. "desc": "调试任务",
  67. "route_strategy": "",
  68. "block_strategy": "",
  69. "executor_timeout": 0,
  70. "executor_fail_retry_count": 0,
  71. "trigger_status": 1,
  72. }
  73. res = send_put('/af/af_job', old_af_job['id'], af_job)
  74. af_job = res['data']
  75. return af_job
  76. def dag_put_task(dag_uuid:str,dag_script: str,db: Session,old_af_task):
  77. envs = {}
  78. requirements_relation = crud.get_requirements_relation(db, dag_uuid)
  79. if requirements_relation:
  80. requirements = crud.get_requirements_status(db, dag_uuid)
  81. if requirements.status != 2:
  82. raise Exception('依赖未安装成功,不可执行')
  83. envs.update({'requirement_package_path': f'{requirement_path}dag_{dag_uuid.lower()})'})
  84. af_task = {
  85. "name": "调试作业",
  86. "file_urls": [],
  87. "script": dag_script,
  88. "cmd": "",
  89. "cmd_parameters": "",
  90. "envs": envs,
  91. "run_image": "",
  92. "task_type": "sparks",
  93. }
  94. res = send_put('/af/af_task', old_af_task['id'],af_task)
  95. af_task = res['data']
  96. return af_task
  97. def dag_job_submit(dag_uuid:str,dag_script: str,db: Session):
  98. job_relation = crud.get_dag_af_id(db,dag_uuid,'debug')
  99. af_job = None
  100. if job_relation is None:
  101. af_job = dag_create_job(dag_uuid, dag_script, db)
  102. else:
  103. af_job = dag_update_job(dag_uuid, dag_script, db)
  104. res = get_job_last_parsed_time(af_job['id'])
  105. current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
  106. send_submit(af_job['id'])
  107. for i in range(0,21):
  108. time.sleep(2)
  109. res = get_job_last_parsed_time(af_job['id'])
  110. last_parsed_time = res['data']['last_parsed_time']
  111. if last_parsed_time and current_time != last_parsed_time:
  112. send_pause(af_job['id'],1)
  113. send_execute(af_job['id'])
  114. print(f"{af_job['id']}<==执行成功==>{last_parsed_time}")
  115. break
  116. if i >= 20:
  117. raise Exception(f"{af_job['id']}==>执行失败")
  118. return af_job
  119. def get_tmp_table_name(dag_uuid: str, node_id: str, out_pin: str, db: Session):
  120. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  121. job_id = relation.af_id
  122. af_job_run = crud.get_airflow_run_once_debug_mode(db,job_id)
  123. tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
  124. task_id = None
  125. for task in tasks:
  126. t_id = task.split('_')[0]
  127. n_id = task.split('_')[1]
  128. if n_id == node_id:
  129. task_id = t_id
  130. break
  131. if task_id:
  132. table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
  133. t_list = hiveDs.list_tables()
  134. table_name = table_name.lower()
  135. if table_name not in t_list:
  136. raise Exception('该节点不存在中间结果')
  137. return table_name
  138. else:
  139. raise Exception('该节点不存在中间结果')
  140. def get_transfer_table_name(project_id: str, user_id: str, name: str, ):
  141. current_time = int(time.time())
  142. return f'{database_name}.project{project_id.lower()}_user{user_id.lower()}_{name.lower()}_{current_time}'