jm_homework.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import time
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import func
  5. import app.crud as crud
  6. from configs.globals import g
  7. from app.services.jm_job import jm_job_create_task, jm_job_update_task
  8. def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate):
  9. jm_homework_create = item.dict()
  10. db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name'])
  11. if db_item:
  12. raise Exception('作业名称已存在')
  13. relation_list = []
  14. if 'relation_list' in jm_homework_create.keys():
  15. relation_list = jm_homework_create.pop('relation_list')
  16. tag = jm_homework_create['tag']
  17. crud.find_and_update(db, '作业标签', tag)
  18. create_time: int = int(time.time())
  19. jm_homework_dict = {}
  20. jm_homework_dict.update(jm_homework_create)
  21. jm_homework_dict.update({
  22. 'user_id': g.user_id,
  23. 'project_id': g.project_id,
  24. 'create_time': create_time,
  25. 'update_time': create_time,
  26. 'status': 1
  27. })
  28. db_item = models.JmHomework(**jm_homework_dict)
  29. # 创建airflow端作业
  30. af_task = jm_job_create_task(db_item, relation_list, db)
  31. # 创建local作业
  32. db_item = crud.create_jm_homework(db,db_item)
  33. # 若作业为dag类型并存在数据源关系,则新建数据源关系
  34. if jm_homework_create['type'] == 'Dag' and relation_list is not None:
  35. for relation in relation_list:
  36. crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  37. # 创建关系表
  38. crud.create_relation(db ,db_item.id, 'task', af_task['id'])
  39. return db_item.to_dict()
  40. def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
  41. jm_homework_update =update_item.dict(exclude_unset=True)
  42. db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
  43. if not db_item:
  44. raise Exception('未找到该作业')
  45. db_name_item = db.query(models.JmHomework)\
  46. .filter(models.JmHomework.name == func.binary(jm_homework_update['name']))\
  47. .filter(models.JmHomework.project_id == g.project_id)\
  48. .filter(models.JmHomework.status != 0)\
  49. .filter(models.JmHomework.id != id).first()
  50. if db_name_item:
  51. raise Exception('作业名称已存在')
  52. relation_list = []
  53. if 'relation_list' in jm_homework_update.keys():
  54. relation_list = jm_homework_update.pop('relation_list')
  55. tag = jm_homework_update['tag']
  56. crud.find_and_update(db, '作业标签', tag)
  57. jm_homework_update.update({
  58. 'user_id': g.user_id,
  59. 'project_id': g.project_id,
  60. 'update_time': int(time.time()),
  61. })
  62. for k, v in jm_homework_update.items():
  63. setattr(db_item, k, v)
  64. # 修改airflow端作业
  65. af_task = jm_job_update_task(db_item, relation_list, db)
  66. # 修改local作业
  67. db_item = crud.update_jm_homework(db, id, db_item)
  68. # 数据源关系修改
  69. crud.delete_jm_relations(db,db_item.id)
  70. if jm_homework_update['type'] == 'Dag' and relation_list is not None:
  71. for relation in relation_list:
  72. crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  73. return db_item.to_dict()