import time from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.crud.constant import find_and_update from app.utils.cron_utils import * def create_jm_job_info(db: Session, item: schemas.JmJobInfoCreate): jm_job_info_create = item.dict() cron_expression_item = jm_job_info_create.pop('cron_expression', None) if jm_job_info_create['cron_type'] == 2 and cron_expression_item is not None: cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item)) cron_select_type = cron_expression_item["cron_select_type"] jm_job_info_create.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression, }) nodes = jm_job_info_create.pop('nodes', None) edges = jm_job_info_create.pop('edges', None) db_item = db.query(models.JmJobInfo).filter(models.JmJobInfo.name == jm_job_info_create['name'])\ .filter(models.JmJobInfo.delete_status != 0).first() if db_item: raise Exception('定时任务名称已存在') tag = jm_job_info_create['tag'] find_and_update(db, '任务标签', tag) jm_job_info = models.JmJobInfo(**jm_job_info_create,**{ 'status': 0, 'delete_status': 1, }) db.add(jm_job_info) db.commit() db.refresh(jm_job_info) return jm_job_info,nodes,edges def get_jm_job_infos(db: Session): res: List[models.JmJobInfo] = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.delete_status != 0)\ .order_by(models.JmJobInfo.id.desc()).all() return res def get_jm_job_info(db: Session, jm_job_id: int): item = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.id == jm_job_id)\ .filter(models.JmJobInfo.delete_status != 0).first() if not item: raise Exception('未找到该定时任务') return item def update_jm_job_info(db: Session, item: schemas.JmJobInfoUpdate): jm_job_info_update = item.dict(exclude_unset=True) cron_expression_item = jm_job_info_update.pop('cron_expression', None) if jm_job_info_update['cron_type'] == 2: cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_item)) cron_select_type = cron_expression_item["cron_select_type"] jm_job_info_update.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression, }) nodes = jm_job_info_update.pop('nodes', None) edges = jm_job_info_update.pop('edges', None) db_item = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.id == jm_job_info_update['id']).first() if not db_item: raise Exception('未找到该定时任务') db_name_item = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.name == jm_job_info_update['name'])\ .filter(models.JmJobInfo.delete_status != 0)\ .filter(models.JmJobInfo.id != item.id).first() if db_name_item: raise Exception('定时任务名称已存在') tag = jm_job_info_update['tag'] find_and_update(db, '任务标签', tag) for k, v in jm_job_info_update.items(): setattr(db_item, k, v) db.commit() db.flush() db.refresh(db_item) return db_item,nodes,edges def delete_jm_job_info(db: Session, jm_job_id: int): jm_job_info = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.id == jm_job_id).first() if not jm_job_info: raise Exception('未找到该定时任务') jm_job_info.delete_status = 0 db.commit() db.flush() db.refresh(jm_job_info) return jm_job_info def update_jm_job_status(db: Session, item: schemas.JmJobInfoStatusUpdate): jm_job_info = db.query(models.JmJobInfo)\ .filter(models.JmJobInfo.id == item.id)\ .filter(models.JmJobInfo.delete_status != 0).first() if not jm_job_info: raise Exception('未找到该定时任务') jm_job_info.status = item.status db.commit() db.flush() db.refresh(jm_job_info) return jm_job_info