Browse Source

1. 更新了任务状态查询函数

luoyulong 2 years ago
parent
commit
408b4f6647
4 changed files with 44 additions and 12 deletions
  1. 0 2
      app/crud/af_run.py
  2. 1 1
      app/models/af_run.py
  3. 18 4
      app/models/database.py
  4. 25 5
      app/routers/run.py

+ 0 - 2
app/crud/af_run.py

@@ -1,8 +1,6 @@
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
-import time
-
 from app.crud import update_to_db
 
 

+ 1 - 1
app/models/af_run.py

@@ -12,7 +12,7 @@ class AirflowRun(BaseModel):
     job_id = Column(Integer)  # 所属任务
     af_run_id = Column(String(60))  # run id in airflow
     run_ts = Column(String(60))  # run timestamp
-    status = Column(Integer) # 0未开始  1运行中 2成功 3失败
+    status = Column(Integer) # 0队列中  1运行中 2成功 3失败
     details = Column(JSON)  # 任务执行详情
 
 

+ 18 - 4
app/models/database.py

@@ -1,17 +1,21 @@
- # database.py
+import time  # database.py
+from typing import Optional
+
+from sqlalchemy.orm import Session
 from sqlalchemy import create_engine
 from sqlalchemy.ext.declarative import declarative_base
 from sqlalchemy.orm import sessionmaker
+from sqlalchemy.orm.attributes import flag_modified
+
 from configs.logging import logger
 from configs.settings import config
 
 USER = config.get('DATABASE', 'USER')
 PWD = config.get('DATABASE', 'pwd')
 DB_NAME = config.get('DATABASE', 'DB_NAME')
-HOST =  config.get('DATABASE', 'HOST')
+HOST = config.get('DATABASE', 'HOST')
 PORT = config.get('DATABASE', 'PORT')
 
-
 SQLALCHEMY_DATABASE_URL = f'mysql+mysqlconnector://{USER}:{PWD}@{HOST}:{PORT}/{DB_NAME}?charset=utf8&auth_plugin=mysql_native_password'
 engine = create_engine(
     SQLALCHEMY_DATABASE_URL, pool_pre_ping=True
@@ -21,11 +25,21 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
 
 logger.info(f" connect to mysql success: {SQLALCHEMY_DATABASE_URL}")
 
-
 Base = declarative_base()
 
+
 class BaseModel(Base):
     __abstract__ = True
+
     def to_dict(self):
         return {c.name: getattr(self, c.name) for c in self.__table__.columns}
 
+    def update(self, db: Session, enforce_update: Optional[dict] = None):
+        for k in enforce_update or {}:
+            flag_modified(self, k)
+        if hasattr(self, "update_time"):
+            self.update_time = int(time.time())
+        # db.add(self)
+        db.commit()
+        db.flush()
+        db.refresh(self)

+ 25 - 5
app/routers/run.py

@@ -28,14 +28,34 @@ def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
     return paginate(crud.get_airflow_tasks(db), params)
 
 
-@router_af_run.get("/{run_id}/state")
+@router_af_run.get("/{run_id}/status")
 @web_try()
 @sxtimeit
 def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
     item = crud.get_airflow_run_once(item_id=run_id, db=db)
-    uri_prefix, headers = get_airflow_api_info()
-    url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
-    return {"state": requests.get(url, headers=headers).json().get('state', None)}
+    job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
+    if job_item.job_mode == 1:  # 常规模式
+        if item.status in [2, 3]:
+            return {"status": item.status}
+        else:
+            uri_prefix, headers = get_airflow_api_info()
+            url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
+            state = requests.get(url, headers=headers).json().get('state', None)
+            status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
+            print(f'status is {status}, with state {state}')
+            if item.status != status:  # queue or running
+                item.status = status
+                item.update(db)
+                print(f'after update {item.status}')
+            return {"status": status}
+    else:
+        uri_prefix, headers = get_airflow_api_info()
+        url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
+        state = requests.get(url, headers=headers).json().get('state', None)
+        status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
+        print(f'status is {status}, with state {state}')
+
+    return {"status": status}
 
 
 @router_af_run.post("/")
@@ -62,7 +82,7 @@ def add_airflow_run(item: Item, db: Session = Depends(get_db)):
                                            "af_run_id": item.data['af_run_id'],
                                            "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
                                                                                    "sparks": sparks_dependence}},
-                                           "status": 1},
+                                           "status": 0},
                                         )
         crud.create_airflow_run(db, item)