|
@@ -109,11 +109,9 @@ def add_airflow_run(item: Item, db: Session = Depends(get_db)):
|
|
|
@web_try()
|
|
|
@sxtimeit
|
|
|
def add_notification(item: Item, db: Session = Depends(get_db)):
|
|
|
- k8s_tool = KubernetesTools()
|
|
|
- labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']}
|
|
|
- logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
|
|
|
+ uri = f"dags/{item.data['dag_id']}/dagRuns/{item.data['af_run_id']}/taskInstances/{item.data['task_id']}/logs/1"
|
|
|
+ logs = call_airflow_api("get", uri, {}).text
|
|
|
job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
|
|
|
-
|
|
|
if job_item.job_mode == 1: # normal model, one job-> many runs
|
|
|
run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
|
|
|
elif job_item.job_mode == 2: # debug model, one job-> one run
|