123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.crud.constant import find_and_update
- from app.crud.jm_homework_datasource_relation import create_jm_hd_relation, delete_jm_relations, get_jm_relations
- from app.services.jm_job import jm_homework_submit
- def create_jm_homework(db: Session, item: schemas.JmHomeworkCreate):
- jm_homework_create = item.dict()
- db_item = db.query(models.JmHomework).filter(models.JmHomework.name == jm_homework_create['name'])\
- .filter(models.JmHomework.status != 0).first()
- if db_item:
- raise Exception('作业名称已存在')
- relation_list = []
- if 'relation_list' in jm_homework_create.keys():
- relation_list = jm_homework_create.pop('relation_list')
- tag = jm_homework_create['tag']
- find_and_update(db, '作业标签', tag)
- create_time: int = int(time.time())
- db_item = models.JmHomework(**jm_homework_create,**{
- 'create_time': create_time,
- 'update_time': create_time,
- 'status': 1
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- if jm_homework_create['type'] == 'Dag' and relation_list is not None:
- for relation in relation_list:
- create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
- jm_homework_submit(db_item, db)
- return db_item.to_dict()
- def get_jm_homeworks(db: Session, project_id: str):
- res: List[models.JmHomework] = db.query(models.JmHomework)\
- .filter(models.JmHomework.project_id == project_id)\
- .filter(models.JmHomework.status == 1)\
- .order_by(models.JmHomework.create_time.desc()).all()
- return res
- def get_jm_homework_info(db: Session, homework_id: int):
- item = db.query(models.JmHomework)\
- .filter(models.JmHomework.id == homework_id).first()
- if item.type == 'Dag':
- relations = get_jm_relations(db,homework_id)
- item.__dict__.update({"hd_relation":relations})
- return item
- def update_jm_homework(db: Session, id: int, update_item: schemas.JmHomeworkUpdate):
- jm_homework_update =update_item.dict(exclude_unset=True)
- db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
- if not db_item:
- raise Exception('未找到该作业')
- db_name_item = db.query(models.JmHomework)\
- .filter(models.JmHomework.name == jm_homework_update['name'])\
- .filter(models.JmHomework.status != 0)\
- .filter(models.JmHomework.id != id).first()
- if db_name_item:
- raise Exception('作业名称已存在')
- relation_list = []
- if 'relation_list' in jm_homework_update.keys():
- relation_list = jm_homework_update.pop('relation_list')
- tag = jm_homework_update['tag']
- find_and_update(db, '作业标签', tag)
- for k, v in jm_homework_update.items():
- setattr(db_item, k, v)
- db_item.update_time = int(time.time())
- db.commit()
- db.flush()
- db.refresh(db_item)
- delete_jm_relations(db,db_item.id)
- if jm_homework_update['type'] == 'Dag' and relation_list is not None:
- for relation in relation_list:
- create_jm_hd_relation(db, db_item.id, schemas.JmHomeworkDatasourceRelationCreate(**relation))
- jm_homework_submit(db_item, db)
- return db_item.to_dict()
- def delete_jm_homework(db: Session, id: int):
- db_item = db.query(models.JmHomework).filter(models.JmHomework.id == id).first()
- if not db_item:
- raise Exception('未找到该作业')
- db_item.status = 0
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def get_jm_homeworks_by_ids(db: Session, ids: List[int]):
- res: List[models.JmHomework] = db.query(models.JmHomework)\
- .filter(models.JmHomework.id.in_(ids)).all()
- return res
- def get_jm_homework_by_dag_url(db: Session, dag_url: str):
- res: List[models.JmHomework] = db.query(models.JmHomework)\
- .filter(models.JmHomework.dag_url == dag_url)\
- .filter(models.JmHomework.status == 1).all()
- return res
|