relation.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.utils.send_util import get_running_status
  6. from constants.constants import RUN_STATUS
  7. def create_relation(db: Session, se_id: int, type: str, af_id: int):
  8. db_item = models.Relation(**{"se_id": se_id,
  9. "type": type,
  10. "af_id": af_id})
  11. db.add(db_item)
  12. db.commit()
  13. db.refresh(db_item)
  14. return db_item
  15. def create_debug_relation(db: Session, dag_uuid: str, type: str, af_id: int):
  16. db_item = models.Relation(**{"dag_uuid": dag_uuid,
  17. "type": type,
  18. "af_id": af_id})
  19. db.add(db_item)
  20. db.commit()
  21. db.refresh(db_item)
  22. return db_item
  23. def create_or_update_requirements_relation(db: Session, dag_uuid: str, af_run_id: int, status: int):
  24. relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\
  25. .filter(models.Relation.dag_uuid == dag_uuid).first()
  26. if relation:
  27. relation.af_run_id = af_run_id
  28. relation.status = status
  29. db.commit()
  30. db.flush()
  31. else:
  32. relation = models.Relation(**{"dag_uuid": dag_uuid,
  33. "type": "requirements",
  34. "af_id": -1,
  35. "af_run_id": af_run_id,
  36. "status": status})
  37. db.add(relation)
  38. db.commit()
  39. db.refresh(relation)
  40. return relation
  41. def get_requirements_relation(db: Session, dag_uuid: str):
  42. relation: models.Relation = db.query(models.Relation).filter(models.Relation.type == "requirements")\
  43. .filter(models.Relation.dag_uuid == dag_uuid).first()
  44. return relation
  45. def get_af_id(db: Session, se_id: int, type: str):
  46. res: models.Relation = db.query(models.Relation)\
  47. .filter(models.Relation.se_id == se_id)\
  48. .filter(models.Relation.type == type).first()
  49. return res
  50. def get_af_ids(db: Session, se_ids: List[int], type: str):
  51. res: List[models.Relation] = db.query(models.Relation)\
  52. .filter(models.Relation.se_id.in_(se_ids))\
  53. .filter(models.Relation.type == type).all()
  54. return res
  55. def get_dag_af_id(db: Session, dag_uuid: int, type: str):
  56. res: models.Relation = db.query(models.Relation)\
  57. .filter(models.Relation.dag_uuid == dag_uuid)\
  58. .filter(models.Relation.type == type).first()
  59. return res
  60. def delete_relation(db: Session, se_id: int, type: str):
  61. res: models.Relation = db.query(models.Relation)\
  62. .filter(models.Relation.se_id == se_id)\
  63. .filter(models.Relation.type == type).delete()
  64. return res
  65. def get_requirements_status(db: Session, dag_uuid: str):
  66. relation = get_requirements_relation(db,dag_uuid)
  67. if not relation:
  68. return relation
  69. af_run_id = relation.af_run_id
  70. running_res = get_running_status(str(-1), af_run_id)
  71. running_status = running_res['data']['status']
  72. relation = create_or_update_requirements_relation(db,dag_uuid, af_run_id, RUN_STATUS[running_status])
  73. return relation