jm_homework.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import time
  2. from typing import List
  3. from app import models, schemas
  4. from sqlalchemy.orm import Session
  5. from app.crud.constant import find_and_update
  6. from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
  7. from app.services.jm_job import jm_homework_submit
  8. def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
  9. jm_homework_create = item.dict()
  10. db_item = db.query(models.JmHomework).filter(models.JmHomework.name == jm_homework_create['name'])\
  11. .filter(models.JmHomework.status != 0).first()
  12. if db_item:
  13. raise Exception('作业名称已存在')
  14. relation_list = []
  15. if 'relation_list' in jm_homework_create.keys():
  16. relation_list = jm_homework_create.pop('relation_list')
  17. tag = jm_homework_create['tag']
  18. find_and_update(db, '作业标签', tag)
  19. create_time: int = int(time.time())
  20. db_item = models.JmHomework(**jm_homework_create,**{
  21. 'create_time': create_time,
  22. 'update_time': create_time,
  23. 'status': 1
  24. })
  25. db.add(db_item)
  26. db.commit()
  27. db.refresh(db_item)
  28. if jm_homework_create['type'] == 'Dag' and relation_list is not None:
  29. for relation in relation_list:
  30. create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  31. jm_homework_submit(db_item, db)
  32. return db_item.to_dict()
  33. def get_jm_homeworks(db: Session, project_id: str):
  34. res: List[models.JmHomework] = db.query(models.JmHomework)\
  35. .filter(models.JmHomework.project_id == project_id)\
  36. .filter(models.JmHomework.status == 1)\
  37. .order_by(models.JmHomework.create_time.desc()).all()
  38. return res
  39. def get_jm_homework_info(db: Session, homework_id: int):
  40. item = db.query(models.JmHomework)\
  41. .filter(models.JmHomework.id == homework_id).first()
  42. if item.type == 'Dag':
  43. relations = get_jm_relations(db,homework_id)
  44. item.__dict__.update({"hd_relation":relations})
  45. return item
  46. def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
  47. jm_homework_update =update_item.dict(exclude_unset=True)
  48. db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
  49. if not db_item:
  50. raise Exception('未找到该作业')
  51. db_name_item = db.query(models.JmHomework)\
  52. .filter(models.JmHomework.name == jm_homework_update['name'])\
  53. .filter(models.JmHomework.status != 0)\
  54. .filter(models.JmHomework.id != id).first()
  55. if db_name_item:
  56. raise Exception('作业名称已存在')
  57. relation_list = []
  58. if 'relation_list' in jm_homework_update.keys():
  59. relation_list = jm_homework_update.pop('relation_list')
  60. tag = jm_homework_update['tag']
  61. find_and_update(db, '作业标签', tag)
  62. for k, v in jm_homework_update.items():
  63. setattr(db_item, k, v)
  64. db_item.update_time = int(time.time())
  65. db.commit()
  66. db.flush()
  67. db.refresh(db_item)
  68. delete_jm_relations(db,db_item.id)
  69. if jm_homework_update['type'] == 'Dag' and relation_list is not None:
  70. for relation in relation_list:
  71. create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  72. jm_homework_submit(db_item, db)
  73. return db_item.to_dict()
  74. def delete_jm_homework(db: Session, id: int):
  75. db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
  76. if not db_item:
  77. raise Exception('未找到该作业')
  78. db_item.status = 0
  79. db.commit()
  80. db.flush()
  81. db.refresh(db_item)
  82. return db_item
  83. def get_jm_homeworks_by_ids(db: Session, ids: List[int]):
  84. res: List[models.JmHomework] = db.query(models.JmHomework)\
  85. .filter(models.JmHomework.id.in_(ids)).all()
  86. return res
  87. def get_jm_homework_by_dag_url(db: Session, dag_url: str):
  88. res: List[models.JmHomework] = db.query(models.JmHomework)\
  89. .filter(models.JmHomework.dag_url == dag_url)\
  90. .filter(models.JmHomework.status == 1).all()
  91. return res