dag.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. from asyncio import current_task
  2. import time
  3. from app import crud, models
  4. from app.utils.send_util import *
  5. from app.utils.utils import get_cmd_parameter
  6. from sqlalchemy.orm import Session
  7. from app.common.hive import hiveDs
  8. def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
  9. af_task = dag_create_task(dag_script)
  10. af_job = {
  11. "tasks": [af_task],
  12. "name": "调试任务",
  13. "dependence": [],
  14. "cron": "None",
  15. "desc": "调试任务",
  16. "route_strategy": "",
  17. "block_strategy": "",
  18. "executor_timeout": 0,
  19. "executor_fail_retry_count": 0,
  20. "trigger_status": 1,
  21. "job_mode":2,
  22. "job_type": 0,
  23. "user_id": 0,
  24. }
  25. res = send_post('/af/af_job', af_job)
  26. af_job = res['data']
  27. crud.create_debug_relation(db,dag_uuid,'debug',af_job['id'])
  28. return af_job
  29. def dag_create_task(dag_script: str):
  30. af_task = {
  31. "name": "调试作业",
  32. "file_urls": [],
  33. "script": dag_script,
  34. "cmd": "",
  35. "cmd_parameters": "",
  36. "envs": {},
  37. "run_image": "",
  38. "task_type": "sparks",
  39. "user_id": 0,
  40. }
  41. res = send_post('/af/af_task', af_task)
  42. af_task = res['data']
  43. return af_task
  44. def dag_update_job(dag_uuid:str,dag_script: str, db: Session):
  45. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  46. af_job_id = relation.af_id
  47. res = send_get("/af/af_job/getOnce",af_job_id)
  48. old_af_job = res['data']
  49. old_af_task = old_af_job['tasks'][0]
  50. af_task = dag_put_task(dag_script,old_af_task)
  51. af_job = {
  52. "tasks": [af_task],
  53. "name": "调试任务",
  54. "dependence": [],
  55. "cron": "None",
  56. "desc": "调试任务",
  57. "route_strategy": "",
  58. "block_strategy": "",
  59. "executor_timeout": 0,
  60. "executor_fail_retry_count": 0,
  61. "trigger_status": 1,
  62. }
  63. res = send_put('/af/af_job', old_af_job['id'], af_job)
  64. af_job = res['data']
  65. return af_job
  66. def dag_put_task(dag_script: str,old_af_task):
  67. af_task = {
  68. "name": "调试作业",
  69. "file_urls": [],
  70. "script": dag_script,
  71. "cmd": "",
  72. "cmd_parameters": "",
  73. "envs": {},
  74. "run_image": "",
  75. "task_type": "sparks",
  76. }
  77. res = send_put('/af/af_task', old_af_task['id'],af_task)
  78. af_task = res['data']
  79. return af_task
  80. def dag_job_submit(dag_uuid:str,dag_script: str,db: Session):
  81. job_relation = crud.get_dag_af_id(db,dag_uuid,'debug')
  82. af_job = None
  83. if job_relation is None:
  84. af_job = dag_create_job(dag_uuid, dag_script, db)
  85. else:
  86. af_job = dag_update_job(dag_uuid, dag_script, db)
  87. current_time = int(time.time())
  88. send_submit(af_job['id'])
  89. for i in range(0,11):
  90. time.sleep(2)
  91. res = get_job_last_parsed_time(af_job['id'])
  92. last_parsed_time = res['data']['last_parsed_time']
  93. if last_parsed_time and current_time < int(last_parsed_time):
  94. send_pause(af_job['id'],1)
  95. send_execute(af_job['id'])
  96. print(f"{af_job['id']}<==执行成功==>{last_parsed_time}")
  97. break
  98. if i >= 10:
  99. raise Exception(f"{af_job['id']}==>执行失败")
  100. return af_job
  101. def get_tmp_table_name(dag_uuid: str, node_id: str, out_pin: str, db: Session):
  102. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  103. job_id = relation.af_id
  104. af_job_run = crud.get_airflow_run_once_debug_mode(db,job_id)
  105. tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
  106. task_id = None
  107. for task in tasks:
  108. t_id = task.split('_')[0]
  109. n_id = task.split('_')[1]
  110. if n_id == node_id:
  111. task_id = t_id
  112. break
  113. if task_id:
  114. table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
  115. t_list = hiveDs.list_tables()
  116. if table_name.lower() not in t_list:
  117. raise Exception('该节点不存在中间结果')
  118. return table_name
  119. else:
  120. raise Exception('该节点不存在中间结果')