123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 |
- import time
- from app import models, schemas
- from sqlalchemy.orm import Session
- from sqlalchemy import func
- import app.crud as crud
- from configs.globals import g
- from app.services.jm_job import jm_job_create_task, jm_job_update_task
- def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate):
- jm_homework_create = item.dict()
- db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name'])
- if db_item:
- raise Exception('作业名称已存在')
- relation_list = []
- if 'relation_list' in jm_homework_create.keys():
- relation_list = jm_homework_create.pop('relation_list')
- tag = jm_homework_create['tag']
- crud.find_and_update(db, '作业标签', tag)
- create_time: int = int(time.time())
- jm_homework_dict = {}
- jm_homework_dict.update(jm_homework_create)
- jm_homework_dict.update({
- 'user_id': g.user_id,
- 'project_id': g.project_id,
- 'create_time': create_time,
- 'update_time': create_time,
- 'status': 1
- })
- db_item = models.JmHomework(**jm_homework_dict)
- # 创建airflow端作业
- af_task = jm_job_create_task(db_item, relation_list, db)
- # 创建local作业
- db_item = crud.create_jm_homework(db,db_item)
- # 若作业为dag类型并存在数据源关系,则新建数据源关系
- if jm_homework_create['type'] == 'Dag' and relation_list is not None:
- for relation in relation_list:
- crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
- # 创建关系表
- crud.create_relation(db ,db_item.id, 'task', af_task['id'])
- return db_item.to_dict()
- def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
- jm_homework_update =update_item.dict(exclude_unset=True)
- db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
- if not db_item:
- raise Exception('未找到该作业')
- db_name_item = db.query(models.JmHomework)\
- .filter(models.JmHomework.name == func.binary(jm_homework_update['name']))\
- .filter(models.JmHomework.project_id == g.project_id)\
- .filter(models.JmHomework.status != 0)\
- .filter(models.JmHomework.id != id).first()
- if db_name_item:
- raise Exception('作业名称已存在')
- relation_list = []
- if 'relation_list' in jm_homework_update.keys():
- relation_list = jm_homework_update.pop('relation_list')
- tag = jm_homework_update['tag']
- crud.find_and_update(db, '作业标签', tag)
- jm_homework_update.update({
- 'user_id': g.user_id,
- 'project_id': g.project_id,
- 'update_time': int(time.time()),
- })
- for k, v in jm_homework_update.items():
- setattr(db_item, k, v)
- # 修改airflow端作业
- af_task = jm_job_update_task(db_item, relation_list, db)
- # 修改local作业
- db_item = crud.update_jm_homework(db, id, db_item)
- # 数据源关系修改
- crud.delete_jm_relations(db,db_item.id)
- if jm_homework_update['type'] == 'Dag' and relation_list is not None:
- for relation in relation_list:
- crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
- return db_item.to_dict()
|