Browse Source

1. 完成日志功能

luoyulong 2 years ago
parent
commit
c8372945be

+ 1 - 1
app/core/airflow/job.py

@@ -17,7 +17,7 @@ class AirflowJobSubmitter:
                           "target_operator_name": f'op_{edge[1]}'})
 
         parameters = {'nodes': nodes, 'spark_nodes': spark_nodes, 'edges': edges, 'dag_id': item.name,
-                      'user_name': item.user_id}
+                      'user_name': item.user_id, 'job_id': item.id}
         env = Environment(
             loader=PackageLoader('app.core.airflow'),
             autoescape=select_autoescape()

+ 4 - 3
app/core/airflow/task.py

@@ -1,4 +1,6 @@
-from app.core.airflow.uri import get_remote_file, spark_result_tb_name
+import json
+
+from app.core.airflow.uri import spark_result_tb_name
 from app.schemas import AirflowTask
 from jinja2 import Environment, PackageLoader, select_autoescape
 from app.common.minio import FileHandler
@@ -102,8 +104,7 @@ class SparksTaskCompiler(TaskCompiler):
         #         ("2", "3")
         #     ]
         # }
-        print(f'prepare to download {self.task.file_urls[0]}')
-        infos = get_remote_file(f'{self.task.file_urls[0]}', return_json=True)
+        infos = json.loads(self.task.script)
         sub_nodes = []
         for info in infos['sub_nodes']:
             if info['op'] == 'sql':

+ 41 - 16
app/core/airflow/templates/dag_template.py.jinja2

@@ -1,24 +1,41 @@
 from airflow import DAG
 from datetime import datetime
-from airflow.operators.bash import BashOperator
+from airflow.operators.empty import EmptyOperator
 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
 from airflow.configuration import conf
 from airflow.utils.task_group import TaskGroup
 import requests
 
-def task_success_alert(context):
+job_id = {{ job_id }}
+
+def task_finish_alert(context):
+    print('############### task begin callback!###################')
     url = 'http://192.168.199.109:18082/jpt/af_run/notification'
-    print('enter callback run!')
-    task_instance_key_str = context['task_instance_key_str']
-    infos = task_instance_key_str.split('__')
-    print(type(context))
-    requests.post(url,json={"data":{'dag_id':infos[0],
-                                    'task_id':infos[1],
-                                    'run_ts':context['ts_nodash'],
-                                    'task_start':type(context),
-                                    'run_id': 123
-                                    }
-                            })
+    ti = context['ti']
+    requests.post(url, json={"data": {
+        "job_id": job_id,
+        "dag_id": ti.dag_id,
+        "task_id": ti.task.task_id,
+        "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
+        "start_time": ti.start_date.timestamp(),
+        "end_time": ti.end_date.timestamp(),
+        "status": ti.current_state()
+    }})
+
+
+def dag_begin_alert(context):
+    print('############### dag begin callback!###################')
+    url = 'http://192.168.199.109:18082/jpt/af_run'
+    ti = context['ti']
+    requests.post(url, json={"data": {
+        "job_id": job_id,
+        "dag_id": ti.dag_id,
+        "task_id": ti.task.task_id,
+        "run_ts": ti.execution_date.strftime('%Y%m%dT%H%M%S'),
+        "start_time": ti.start_date.timestamp(),
+        "end_time": ti.end_date.timestamp(),
+        "status": ti.current_state()
+    }})
 
 print('enter dag run!')
 namespace = conf.get("kubernetes", "NAMESPACE")
@@ -29,8 +46,10 @@ name = "dag_user{{ user_name }}"
 
 # instantiate the DAG
 with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',dag_id="{{ dag_id }}") as dag:
+    op_start = EmptyOperator(task_id='start', on_success_callback=dag_begin_alert)
+
     {% for spark_node in spark_nodes %}
-    with TaskGroup("{{ spark_node['name'] }}", tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}:
+    with TaskGroup("{{ spark_node['name'] }}", prefix_group_id=False, tooltip="{{ spark_node['desc'] }}") as op_{{ spark_node['id'] }}:
     {% for spark_sub_node in spark_node['sub_nodes'] %}
         op_{{ spark_sub_node['id'] }} = KubernetesPodOperator(
                                                                     task_id="{{ spark_sub_node["id"] }}",
@@ -46,7 +65,8 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
                                                                     log_events_on_failure=True,
                                                                     cmds={{ spark_sub_node['cmds'] }},
                                                                     env_vars={{ spark_sub_node['env'] }},
-                                                                    on_success_callback=task_success_alert
+                                                                    on_success_callback=task_finish_alert,
+{#                                                                    on_failure_callback=task_finish_alert#}
                                                                     )
     {% endfor %}
         {% for edge in spark_node['edges'] %}
@@ -71,9 +91,14 @@ with DAG(start_date=datetime(2022,6,1),catchup=False,schedule_interval='@daily',
         log_events_on_failure=True,
         cmds={{ node['cmds'] }},
         env_vars={{ node['env'] }},
-        on_success_callback=task_success_alert
+        on_success_callback=task_finish_alert,
+{#        on_failure_callback=task_finish_alert#}
         )
+    op_start >> {{ node['operator_name'] }}
     {% endfor %}
+
+
+
     {% for edge in edges%}
     {{ edge['source_operator_name'] }} >> {{ edge['target_operator_name'] }}
     {% endfor %}

+ 0 - 2
app/core/airflow/templates/sql_script_template.py.jinja2

@@ -12,6 +12,4 @@ def run(outputs: list):
 
 
 if __name__ == '__main__':
-    outputs_str = os.environ["OUTPUTS"]
-    print(outputs_str)
     run(outputs={{ outputs }})

+ 5 - 0
app/crud/af_job.py

@@ -4,6 +4,7 @@ from sqlalchemy.orm import Session
 import time
 
 from app.core.airflow.job import AirflowJobSubmitter
+from app.crud import update_to_db
 
 
 def create_airflow_job(db: Session, item: schemas.AirflowJobCreate):
@@ -24,5 +25,9 @@ def get_airflow_job_once(db: Session, id: int):
     return res
 
 
+def update_airflow_job(db: Session, item_id: int, update_item: schemas.AirflowJobUpdate):
+    return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowJob)
+
+
 def create_airflow_job_submit(item: schemas.AirflowJob):
     AirflowJobSubmitter.submit_dag(item)

+ 8 - 9
app/crud/af_run.py

@@ -3,21 +3,19 @@ from app import models, schemas
 from sqlalchemy.orm import Session
 import time
 
+from app.crud import update_to_db
+
 
 def create_airflow_run(db: Session, item: schemas.AirflowRunCreate):
-    db_item = models.AirflowRun(**item.dict(), **{"create_time": int(time.time()), "update_time": int(time.time())})
+    db_item = models.AirflowRun(**item.dict())
     db.add(db_item)
     db.commit()
     db.refresh(db_item)
     return db_item
 
 
-def update_airflow_run(db: Session, item: schemas.AirflowRunUpdate):
-    db_item = models.AirflowRun(**item.dict(), **{"update_time": int(time.time())})
-    db.add(db_item)
-    db.commit()
-    db.refresh(db_item)
-    return db_item
+def update_airflow_run(db: Session, item_id: int, update_item: schemas.AirflowRunUpdate):
+    return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowRun)
 
 
 def get_airflow_runs(db: Session):
@@ -25,6 +23,7 @@ def get_airflow_runs(db: Session):
     return res
 
 
-def get_airflow_run_once(db: Session, run_id: str):
-    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.run_id == run_id).first()
+def get_airflow_run_once(db: Session, run_id: str, job_id: int):
+    res: models.AirflowRun = db.query(models.AirflowRun).filter(models.AirflowRun.run_id == run_id,
+                                                                models.AirflowRun.job_id == job_id).first()
     return res

+ 5 - 0
app/crud/af_task.py

@@ -1,6 +1,7 @@
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
+from app.crud.basic import update_to_db
 import time
 
 
@@ -12,6 +13,10 @@ def create_airflow_task(db: Session, item: schemas.AirflowTaskCreate):
     return db_item
 
 
+def update_airflow_task(db: Session, item_id: int, update_item: schemas.AirflowTaskUpdate):
+    return update_to_db(update_item=update_item, item_id=item_id, db=db, model_cls=models.AirflowTask)
+
+
 def get_airflow_tasks(db: Session):
     res: List[models.AirflowTask] = db.query(models.AirflowTask).all()
     return res

+ 22 - 0
app/crud/basic.py

@@ -0,0 +1,22 @@
+from typing import Type
+from sqlalchemy.orm import Session
+import time
+from sqlalchemy.orm.attributes import flag_modified
+from app.models import BaseModel
+
+
+def update_to_db(db: Session, item_id: int, update_item, model_cls: Type[BaseModel]):
+    db_item = db.query(model_cls).filter(model_cls.id == item_id).first()
+    if not db_item:
+        raise Exception('未找到该任务')
+    update_dict = update_item.dict(exclude_unset=True)
+    for k, v in update_dict.items():
+        setattr(db_item, k, v)
+        flag_modified(db_item, k)
+
+    if hasattr(db_item, "update_time"):
+        db_item.update_time = int(time.time())
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item

+ 8 - 1
app/routers/job.py

@@ -26,10 +26,17 @@ def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
     return crud.create_airflow_job(db, item)
 
 
+@router_af_job.put("/{item_id}")
+@web_try()
+@sxtimeit
+def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)):
+    return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item)
+
+
 @router_af_job.post("/submit")
 @web_try()
 @sxtimeit
 def add_dag_submit(id: int, db: Session = Depends(get_db)):
-    item = crud.get_airflow_job_once(db,id)
+    item = crud.get_airflow_job_once(db, id)
     create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
     # return crud.create_airflow_job(item)

+ 30 - 17
app/routers/run.py

@@ -1,11 +1,10 @@
+import json
 from fastapi import APIRouter, Depends
 from fastapi_pagination import paginate, Params
 from pydantic import BaseModel
 from sqlalchemy.orm import Session
-
 from app import schemas, get_db, crud
 from app.core.k8s.k8s_client import KubernetesTools
-from app.schemas import AirflowRunCreate
 from utils import web_try, sxtimeit
 
 
@@ -29,8 +28,27 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
 @router_af_run.post("/")
 @web_try()
 @sxtimeit
-def add_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
-    return crud.create_airflow_job(db, item)
+def add_airflow_run(item: Item, db: Session = Depends(get_db)):
+    print(item.data)
+    job_item = crud.get_airflow_job_once(db=db, id=item.data["job_id"])
+    sparks_dependence = {}
+    if job_item is not None:
+        for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
+            if task.task_type == 'sparks':
+                sparks_info = json.loads(task.script)
+                dependence = []
+                for (source, sink) in sparks_info['edges']:
+                    dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
+                sparks_dependence[task.id] = dependence
+
+        item = schemas.AirflowRunCreate(**{"start_time": int(item.data["start_time"]),
+                                           "job_id": item.data["job_id"],
+                                           "run_id": item.data['run_ts'],
+                                           "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
+                                                                                   "sparks": sparks_dependence}},
+                                           "status": "1"},
+                                        )
+        crud.create_airflow_run(db, item)
 
 
 @router_af_run.post("/notification")
@@ -43,18 +61,13 @@ def add_notification(item: Item, db: Session = Depends(get_db)):
               "task_id": item.data['task_id'],
               "run_ts": item.data['run_ts']}
     logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
+    run = crud.get_airflow_run_once(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
 
-    run_id = item.data['run_id']
-
-    # run_item = crud.get_airflow_run_once(db=db,run_id=run_id)
-    # return
-    # if run_item is not None:
-    #     pass
-    # else:
-    #     item:AirflowRunCreate = AirflowRunCreate({"start_time":})
-    #     # crud.create_airflow_run(db=db,r)
-    #     # create
-    #     pass
-
-    print(f'logs are {logs}')
+    if run is not None:
+        update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
+        update_run.details['tasks'][item.data['task_id']] = {"log": logs,
+                                                             "start_time": item.data["start_time"],
+                                                             "end_time": item.data["end_time"],
+                                                             "status": item.data['status']}
+        crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
     return

+ 14 - 0
app/routers/task.py

@@ -18,6 +18,20 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_airflow_tasks(db), params)
 
 
+@router_af_task.get("/getOnce/{id}")
+@web_try()
+@sxtimeit
+def get_task_once(id: int, db: Session = Depends(get_db)):
+    return crud.get_airflow_task_once(db, id)
+
+
+@router_af_task.put("/{item_id}")
+@web_try()
+@sxtimeit
+def update_task(item_id: int, update_item: schemas.AirflowTaskUpdate, db: Session = Depends(get_db)):
+    return crud.update_airflow_task(db=db, item_id=item_id, update_item=update_item)
+
+
 @router_af_task.post("/")
 @web_try()
 @sxtimeit

+ 2 - 1
app/schemas/af_job.py

@@ -22,7 +22,8 @@ class AirflowJobCreate(AirflowJobBase):
 
 
 class AirflowJobUpdate(AirflowJobBase):
-    update_time: int
+    pass
+    # update_time: int
 
 
 class AirflowJob(AirflowJobBase):

+ 2 - 1
app/schemas/af_run.py

@@ -12,7 +12,8 @@ class AirflowRunCreate(AirflowRunBase):
 
 
 class AirflowRunUpdate(AirflowRunBase):
-    end_time: int
+    pass
+    # end_time: int
 
 
 class AirflowRun(AirflowRunBase):

+ 2 - 1
app/schemas/af_task.py

@@ -18,7 +18,8 @@ class AirflowTaskCreate(AirflowTaskBase):
 
 
 class AirflowTaskUpdate(AirflowTaskBase):
-    update_time: int
+    pass
+    # update_time: int
 
 
 class AirflowTask(AirflowTaskBase):

+ 0 - 0
auo_tests/airflow_tasks/__init__.py


+ 42 - 0
auo_tests/airflow_tasks/manual_dummy_task.py

@@ -0,0 +1,42 @@
+import airflow
+from airflow import DAG
+from datetime import datetime
+from airflow.operators.empty import EmptyOperator
+from airflow.configuration import conf
+import requests
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
+
+namespace = conf.get("kubernetes", "NAMESPACE")
+
+# set the name that will be printed
+name = "luoyulong"
+
+
+def task_success_alert(context: airflow.utils.context.Context):
+    url = 'http://192.168.199.109:18082/jpt/af_run/notification'
+    print('############### enter success callback!')
+    # task_instance_key_str = context['task_instance_key_str']
+    # infos = task_instance_key_str.split('__')
+    print(type(context))
+    print(context.keys())
+    print(context)
+    # requests.post(url, data={"data": {
+    requests.post(url, json={"data": {'run_id': 123}})
+
+
+def task_begin_alert(context):
+    # pass
+    print('############### enter begin callback!')
+    url = 'http://192.168.199.109:18082/jpt/af_run/notification'
+    requests.post(url, json={"data": {'run_id': "begin to task exec "}})
+
+
+# instantiate the DAG
+with DAG(
+        start_date=datetime(2022, 6, 1),
+        catchup=False,
+        schedule_interval='@daily',
+        dag_id="single_empty_task"
+) as dag:
+    op_test = EmptyOperator(task_id='empty_op', on_success_callback=task_success_alert,
+                            pre_execute=task_begin_alert)

File diff suppressed because it is too large
+ 14 - 0
auo_tests/jupyters/01-update-job.ipynb


Some files were not shown because too many files changed in this diff