import time from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.utils.cron_utils import * from configs.globals import g def create_job_info(db: Session, item: models.JobInfo): db.add(item) db.commit() db.refresh(item) return item def get_job_infos(db: Session): res: List[models.JobInfo] = db.query(models.JobInfo)\ .filter(models.JobInfo.project_id == g.project_id)\ .filter(models.JobInfo.delete_status == 1)\ .order_by(models.JobInfo.create_time.desc()).all() # TODO: 排序 return res def update_job_info(db: Session, id: int, update_item: models.JobInfo): db.commit() db.flush() db.refresh(update_item) return update_item def update_job_trigger_status(db: Session, id: int, trigger_status: int): db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first() if not db_item: raise Exception('未找到该任务') db_item.trigger_status = trigger_status db_item.update_time = int(time.time()) db.commit() db.flush() db.refresh(db_item) return db_item def get_job_info(db: Session, id: int): db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first() if not db_item: raise Exception('未找到该任务') return db_item def delete_job_info(db: Session, job_id: int): job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first() if not job_item: raise Exception('未找到该任务') if job_item.trigger_status == 1: raise Exception('该任务未停用,不能删除') job_item.delete_status = 0 db.commit() db.flush() db.refresh(job_item) return job_item