jm_job_log.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import json
  2. from typing import List, Optional
  3. from fastapi import APIRouter
  4. from fastapi import Depends, Query
  5. from sqlalchemy.orm import Session
  6. from app import page_help, schemas
  7. from app.common.security.auth import verify_users
  8. import app.crud as crud
  9. from app.utils.send_util import get_job_run_status, get_task_log
  10. from constants.constants import RUN_STATUS
  11. from utils.sx_time import sxtimeit
  12. from utils.sx_web import web_try
  13. from fastapi_pagination import Page, add_pagination, paginate, Params
  14. from app import get_db, get_page
  15. router = APIRouter(
  16. prefix="/jpt/jm_job_log",
  17. tags=["jm_job_log-定时任务日志管理"],
  18. )
  19. @router.get("/")
  20. @web_try()
  21. @sxtimeit
  22. def get_job_logs(job_id: int = None, params: Params=Depends(get_page), token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  23. jm_job_list = []
  24. # 是否有任务筛选
  25. if job_id is not None:
  26. jm_job_list = [crud.get_jm_job_info(db, job_id)]
  27. else:
  28. jm_job_list = crud.get_jm_job_infos(db)
  29. id_to_job = {job.id:job for job in jm_job_list}
  30. relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
  31. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  32. # 获取任务运行记录
  33. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size'])
  34. total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys())
  35. res = []
  36. for af_job_run in af_job_runs:
  37. job_id = af_to_datax[int(af_job_run.job_id)]
  38. execute_result = None
  39. # 若该记录未运行完成,获取运行的状态
  40. if af_job_run.status <= 1:
  41. run_status = get_job_run_status(af_job_run.id)
  42. execute_result = run_status['data']['status']
  43. log = {
  44. "id": af_job_run.id,
  45. "job_id": job_id,
  46. "job_name": id_to_job[job_id].name,
  47. "job_type": id_to_job[job_id].type,
  48. "job_tag": id_to_job[job_id].tag,
  49. "af_job_id": int(af_job_run.job_id),
  50. "run_id": af_job_run.af_run_id,
  51. "start_time": af_job_run.start_time,
  52. "result": execute_result if execute_result else af_job_run.status,
  53. }
  54. res.append(log)
  55. return page_help(res,params['page'],params['size'],total)
  56. @router.get("/all_task")
  57. @web_try()
  58. @sxtimeit
  59. def get_job_all_task(run_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  60. af_job_run = crud.get_airflow_run_once(db, run_id)
  61. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  62. tasks = list(af_job.tasks) if len(list(af_job.tasks))>0 else []
  63. res = []
  64. for task in tasks:
  65. if task['task_type'] == 'sparks':
  66. task_script = json.loads(task['script'])
  67. for node in task_script['sub_nodes']:
  68. task_id = str(task['id'])+'_'+node['id']
  69. log ={
  70. 'name':task['name']+'-'+node['name'],
  71. 'job_id':af_job.id,
  72. 'af_run_id':af_job_run.af_run_id,
  73. 'task_id': task_id,
  74. }
  75. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  76. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  77. if task_log:
  78. log.update({
  79. 'execute_result': RUN_STATUS[task_log['status']] if task_log['status'] else 0,
  80. 'execute_time':task_log['start_time'],
  81. 'log': task_log['log']
  82. })
  83. res.append(log)
  84. else:
  85. log ={
  86. 'name':task['name'],
  87. 'job_id':af_job.id,
  88. 'af_run_id':af_job_run.af_run_id,
  89. 'task_id':task['id'],
  90. }
  91. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
  92. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  93. if task_log:
  94. log.update({
  95. 'execute_result':RUN_STATUS[task_log['status']] if task_log['status'] else 0,
  96. 'execute_time':task_log['start_time'],
  97. 'log': task_log['log']
  98. })
  99. res.append(log)
  100. return res
  101. @router.get("/task_log/{job_id}/{af_run_id}/{task_id}")
  102. @web_try()
  103. @sxtimeit
  104. def get_job_task_log(job_id: str, af_run_id: str, task_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  105. res = get_task_log(job_id, af_run_id, task_id)
  106. log = res['data'] if 'data' in res else None
  107. if log:
  108. log['status'] = RUN_STATUS[log['status']] if log['status'] else 0
  109. return log
  110. @router.get("/logs_status/{ids}")
  111. @web_try()
  112. @sxtimeit
  113. def get_job_log_status(ids: str, token_data: schemas.TokenData = Depends(verify_users)):
  114. run_ids = ids.split(',')
  115. id_to_status = {}
  116. for run_id in run_ids:
  117. res = get_job_run_status(run_id)
  118. # id_to_status.update({run_id:RUN_STATUS[res['data']['status']]})
  119. id_to_status.update({run_id:res['data']['status']})
  120. return id_to_status