1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.crud.constant import find_and_update
- def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate):
- jm_job_info_create = item.dict()
- nodes = jm_job_info_create.pop('nodes', None)
- edges = jm_job_info_create.pop('edges', None)
- db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\
- .filter(models.JmJobInfo.delete_status != 0).first()
- if db_item:
- raise Exception('定时任务名称已存在')
- tag = jm_job_info_create['tag']
- find_and_update(db, '任务分类', tag)
- jm_job_info = models.JmJobInfo(**jm_job_info_create,**{
- 'status': 1,
- 'delete_status': 1,
- })
- db.add(jm_job_info)
- db.commit()
- db.refresh(jm_job_info)
- return jm_job_info,nodes,edges
- def get_jm_job_infos(db: Session):
- res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.delete_status != 0)\
- .order_by(models.JmJobInfo.id.desc()).all()
- return res
- def get_jm_job_info(db: Session, jm_job_id: int):
- item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.id == jm_job_id)\
- .filter(models.JmJobInfo.delete_status != 0).first()
- if not item:
- raise Exception('未找到该定时任务')
- return item
- def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate):
- jm_job_info_update = item.dict(exclude_unset=True)
- nodes = jm_job_info_update.pop('nodes', None)
- edges = jm_job_info_update.pop('edges', None)
- db_item = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.id == jm_job_info_update['id']).first()
- if not db_item:
- raise Exception('未找到该定时任务')
- for k, v in jm_job_info_update.items():
- setattr(db_item, k, v)
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item,nodes,edges
- def delete_jm_job_info(db: Session, jm_job_id: int):
- jm_job_info = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.id == jm_job_id).first()
- if not jm_job_info:
- raise Exception('未找到该定时任务')
- jm_job_info.delete_status = 0
- db.commit()
- db.flush()
- db.refresh(jm_job_info)
- return jm_job_info
- def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate):
- jm_job_info = db.query(models.JmJobInfo)\
- .filter(models.JmJobInfo.id == item.id)\
- .filter(models.JmJobInfo.delete_status != 0).first()
- if not jm_job_info:
- raise Exception('未找到该定时任务')
- jm_job_info.status = item.status
- db.commit()
- db.flush()
- db.refresh(jm_job_info)
- return jm_job_info
|