import time from app import models, schemas from sqlalchemy.orm import Session from sqlalchemy import func import app.crud as crud from configs.globals import g from app.services.jm_job import jm_job_create_task, jm_job_update_task def create_jm_homework_services(db: Session, item: schemas.JmHomeworkCreate): jm_homework_create = item.dict() db_item = crud.get_jm_homework_by_name(db,jm_homework_create['name']) if db_item: raise Exception('作业名称已存在') relation_list = [] if 'relation_list' in jm_homework_create.keys(): relation_list = jm_homework_create.pop('relation_list') tag = jm_homework_create['tag'] crud.find_and_update(db, '作业标签', tag) create_time: int = int(time.time()) jm_homework_dict = {} jm_homework_dict.update(jm_homework_create) jm_homework_dict.update({ 'user_id': g.user_id, 'project_id': g.project_id, 'create_time': create_time, 'update_time': create_time, 'status': 1 }) db_item = models.JmHomework(**jm_homework_dict) # 创建airflow端作业 af_task = jm_job_create_task(db_item, relation_list, db) # 创建local作业 db_item = crud.create_jm_homework(db,db_item) # 若作业为dag类型并存在数据源关系,则新建数据源关系 if jm_homework_create['type'] == 'Dag' and relation_list is not None: for relation in relation_list: crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation)) # 创建关系表 crud.create_relation(db ,db_item.id, 'task', af_task['id']) return db_item.to_dict() def update_jm_homework_service(db: Session, id: int, update_item: schemas.JmHomeworkUpdate): jm_homework_update =update_item.dict(exclude_unset=True) db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first() if not db_item: raise Exception('未找到该作业') db_name_item = db.query(models.JmHomework)\ .filter(models.JmHomework.name == func.binary(jm_homework_update['name']))\ .filter(models.JmHomework.project_id == g.project_id)\ .filter(models.JmHomework.status != 0)\ .filter(models.JmHomework.id != id).first() if db_name_item: raise Exception('作业名称已存在') relation_list = [] if 'relation_list' in jm_homework_update.keys(): relation_list = jm_homework_update.pop('relation_list') tag = jm_homework_update['tag'] crud.find_and_update(db, '作业标签', tag) jm_homework_update.update({ 'user_id': g.user_id, 'project_id': g.project_id, 'update_time': int(time.time()), }) for k, v in jm_homework_update.items(): setattr(db_item, k, v) # 修改airflow端作业 af_task = jm_job_update_task(db_item, relation_list, db) # 修改local作业 db_item = crud.update_jm_homework(db, id, db_item) # 数据源关系修改 crud.delete_jm_relations(db,db_item.id) if jm_homework_update['type'] == 'Dag' and relation_list is not None: for relation in relation_list: crud.create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation)) return db_item.to_dict()