1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.utils.send_util import get_running_status
- from constants.constants import RUN_STATUS
- def create_relation(db: Session, se_id: int, type: str, af_id: int):
- db_item = models.Relation(**{"se_id": se_id,
- "type": type,
- "af_id": af_id})
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def create_debug_relation(db: Session, dag_uuid: str, type: str, af_id: int):
- db_item = models.Relation(**{"dag_uuid": dag_uuid,
- "type": type,
- "af_id": af_id})
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def create_or_update_requirements_relation(db: Session, dag_uuid: str, af_run_id: int, status: int):
- relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\
- .filter(models.Relation.dag_uuid == dag_uuid).first()
- if relation:
- relation.af_run_id = af_run_id
- relation.status = status
- db.commit()
- db.flush()
- else:
- relation = models.Relation(**{"dag_uuid": dag_uuid,
- "type": "requirements",
- "af_id": -1,
- "af_run_id": af_run_id,
- "status": status})
- db.add(relation)
- db.commit()
- db.refresh(relation)
- return relation
- def get_requirements_relation(db: Session, dag_uuid: str):
- relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\
- .filter(models.Relation.dag_uuid == dag_uuid).first()
- return relation
- def get_af_id(db: Session, se_id: int, type: str):
- res: models.Relation = db.query(models.Relation)\
- .filter(models.Relation.se_id == se_id)\
- .filter(models.Relation.type == type).first()
- return res
- def get_af_ids(db: Session, se_ids: List[int], type: str):
- res: List[models.Relation] = db.query(models.Relation)\
- .filter(models.Relation.se_id.in_(se_ids))\
- .filter(models.Relation.type == type).all()
- return res
- def get_dag_af_id(db: Session, dag_uuid: int, type: str):
- res: models.Relation = db.query(models.Relation)\
- .filter(models.Relation.dag_uuid == dag_uuid)\
- .filter(models.Relation.type == type).first()
- return res
- def delete_relation(db: Session, se_id: int, type: str):
- res: models.Relation = db.query(models.Relation)\
- .filter(models.Relation.se_id == se_id)\
- .filter(models.Relation.type == type).delete()
- return res
- def get_requirements_status(db: Session, dag_uuid: str):
- relation = get_requirements_relation(db,dag_uuid)
- if not relation:
- return relation
- af_run_id = relation.af_run_id
- running_res = get_running_status(str(-1), af_run_id)
- running_status = running_res['data']['status']
- relation = create_or_update_requirements_relation(db,dag_uuid, af_run_id, RUN_STATUS[running_status])
- return relation
|