jm_homework.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import time
  2. from app import models, schemas
  3. from sqlalchemy.orm import Session
  4. import app.crud as crud
  5. from configs.globals import g
  6. from app.services.jm_job import jm_job_create_task, jm_job_update_task
  7. def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate):
  8. jm_homework_create = item.dict()
  9. db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name'])
  10. if db_item:
  11. raise Exception('作业名称已存在')
  12. relation_list = []
  13. if 'relation_list' in jm_homework_create.keys():
  14. relation_list = jm_homework_create.pop('relation_list')
  15. tag = jm_homework_create['tag']
  16. crud.find_and_update(db, '作业标签', tag)
  17. create_time: int = int(time.time())
  18. jm_homework_dict = {}
  19. jm_homework_dict.update(jm_homework_create)
  20. jm_homework_dict.update({
  21. 'user_id': g.user_id,
  22. 'project_id': g.project_id,
  23. 'create_time': create_time,
  24. 'update_time': create_time,
  25. 'status': 1
  26. })
  27. db_item = models.JmHomework(**jm_homework_dict)
  28. # 创建airflow端作业
  29. af_task = jm_job_create_task(db_item, relation_list, db)
  30. # 创建local作业
  31. db_item = crud.create_jm_homework(db,db_item)
  32. # 若作业为dag类型并存在数据源关系,则新建数据源关系
  33. if jm_homework_create['type'] == 'Dag' and relation_list is not None:
  34. for relation in relation_list:
  35. crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  36. # 创建关系表
  37. crud.create_relation(db ,db_item.id, 'task', af_task['id'])
  38. return db_item.to_dict()
  39. def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
  40. jm_homework_update =update_item.dict(exclude_unset=True)
  41. db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
  42. if not db_item:
  43. raise Exception('未找到该作业')
  44. db_name_item = db.query(models.JmHomework)\
  45. .filter(models.JmHomework.name == jm_homework_update['name'])\
  46. .filter(models.JmHomework.project_id == g.project_id)\
  47. .filter(models.JmHomework.status != 0)\
  48. .filter(models.JmHomework.id != id).first()
  49. if db_name_item:
  50. raise Exception('作业名称已存在')
  51. relation_list = []
  52. if 'relation_list' in jm_homework_update.keys():
  53. relation_list = jm_homework_update.pop('relation_list')
  54. tag = jm_homework_update['tag']
  55. crud.find_and_update(db, '作业标签', tag)
  56. jm_homework_update.update({
  57. 'user_id': g.user_id,
  58. 'project_id': g.project_id,
  59. 'update_time': int(time.time()),
  60. })
  61. for k, v in jm_homework_update.items():
  62. setattr(db_item, k, v)
  63. # 修改airflow端作业
  64. af_task = jm_job_update_task(db_item, relation_list, db)
  65. # 修改local作业
  66. db_item = crud.update_jm_homework(db, id, db_item)
  67. # 数据源关系修改
  68. crud.delete_jm_relations(db,db_item.id)
  69. if jm_homework_update['type'] == 'Dag' and relation_list is not None:
  70. for relation in relation_list:
  71. crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
  72. return db_item.to_dict()