import time from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.utils.send_util import get_running_status from constants.constants import RUN_STATUS def create_relation(db: Session, se_id: int, type: str, af_id: int): db_item = models.Relation(**{"se_id": se_id, "type": type, "af_id": af_id}) db.add(db_item) db.commit() db.refresh(db_item) return db_item def create_debug_relation(db: Session, dag_uuid: str, type: str, af_id: int): db_item = models.Relation(**{"dag_uuid": dag_uuid, "type": type, "af_id": af_id}) db.add(db_item) db.commit() db.refresh(db_item) return db_item def create_or_update_requirements_relation(db: Session, dag_uuid: str, af_run_id: int, status: int): relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\ .filter(models.Relation.dag_uuid == dag_uuid).first() if relation: relation.af_run_id = af_run_id relation.status = status db.commit() db.flush() else: relation = models.Relation(**{"dag_uuid": dag_uuid, "type": "requirements", "af_id": -1, "af_run_id": af_run_id, "status": status}) db.add(relation) db.commit() db.refresh(relation) return relation def get_requirements_relation(db: Session, dag_uuid: str): relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\ .filter(models.Relation.dag_uuid == dag_uuid).first() return relation def get_af_id(db: Session, se_id: int, type: str): res: models.Relation = db.query(models.Relation)\ .filter(models.Relation.se_id == se_id)\ .filter(models.Relation.type == type).first() return res def get_af_ids(db: Session, se_ids: List[int], type: str): res: List[models.Relation] = db.query(models.Relation)\ .filter(models.Relation.se_id.in_(se_ids))\ .filter(models.Relation.type == type).all() return res def get_dag_af_id(db: Session, dag_uuid: int, type: str): res: models.Relation = db.query(models.Relation)\ .filter(models.Relation.dag_uuid == dag_uuid)\ .filter(models.Relation.type == type).first() return res def delete_relation(db: Session, se_id: int, type: str): res: models.Relation = db.query(models.Relation)\ .filter(models.Relation.se_id == se_id)\ .filter(models.Relation.type == type).delete() return res def get_requirements_status(db: Session, dag_uuid: str): relation = get_requirements_relation(db,dag_uuid) if not relation: return relation af_run_id = relation.af_run_id running_res = get_running_status(str(-1), af_run_id) running_status = running_res['data']['status'] relation = create_or_update_requirements_relation(db,dag_uuid, af_run_id, RUN_STATUS[running_status]) return relation