jm_homework.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import time
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. import app.crud as crud
  5. from app.services.jm_job import jm_job_create_task, jm_job_update_task
  6. def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate):
  7. jm_homework_create = item.dict()
  8. db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name'])
  9. if db_item:
  10. raise Exception('作业名称已存在')
  11. relation_list = []
  12. if 'relation_list' in jm_homework_create.keys():
  13. relation_list = jm_homework_create.pop('relation_list')
  14. tag = jm_homework_create['tag']
  15. crud.find_and_update(db, '作业标签', tag)
  16. create_time: int = int(time.time())
  17. db_item = models.JmHomework(**jm_homework_create,**{
  18. 'create_time': create_time,
  19. 'update_time': create_time,
  20. 'status': 1
  21. })
  22. # 创建airflow端作业
  23. af_task = jm_job_create_task(db_item, relation_list, db)
  24. # 创建local作业
  25. db_item = crud.create_jm_homework(db,db_item)
  26. # 若作业为dag类型并存在数据源关系,则新建数据源关系
  27. if jm_homework_create['type'] == 'Dag' and relation_list is not None:
  28. for relation in relation_list:
  29. crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  30. # 创建关系表
  31. crud.create_relation(db ,db_item.id, 'task', af_task['id'])
  32. return db_item.to_dict()
  33. def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
  34. jm_homework_update =update_item.dict(exclude_unset=True)
  35. db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
  36. if not db_item:
  37. raise Exception('未找到该作业')
  38. db_name_item = db.query(models.JmHomework)\
  39. .filter(models.JmHomework.name == jm_homework_update['name'])\
  40. .filter(models.JmHomework.status != 0)\
  41. .filter(models.JmHomework.id != id).first()
  42. if db_name_item:
  43. raise Exception('作业名称已存在')
  44. relation_list = []
  45. if 'relation_list' in jm_homework_update.keys():
  46. relation_list = jm_homework_update.pop('relation_list')
  47. tag = jm_homework_update['tag']
  48. crud.find_and_update(db, '作业标签', tag)
  49. for k, v in jm_homework_update.items():
  50. setattr(db_item, k, v)
  51. db_item.update_time = int(time.time())
  52. # 修改airflow端作业
  53. af_task = jm_job_update_task(db_item, relation_list, db)
  54. # 修改local作业
  55. db_item = crud.update_jm_homework(db, id, db_item)
  56. # 数据源关系修改
  57. crud.delete_jm_relations(db,db_item.id)
  58. if jm_homework_update['type'] == 'Dag' and relation_list is not None:
  59. for relation in relation_list:
  60. crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  61. return db_item.to_dict()