import time from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.utils.cron_utils import * def create_job_info(db: Session, item: schemas.JobInfoCreate): create_time: int = int(time.time()) item_dict = item.dict() cron_expression_dict = item_dict.pop('cron_expression') cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict)) cron_select_type = cron_expression_dict["cron_select_type"] item_dict.update({ 'cron_select_type': cron_select_type, 'job_cron': cron_expression, }) partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() and item_dict['partition_info'] != '' else None partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() and item_dict['partition_time'] != '' else None partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() and item_dict['partition_num'] != '' else None partition_info_str = '' if partition_info is not None and partition_time is not None and partition_num is not None: partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time elif partition_info is not None and (partition_time is None or partition_num is None): raise Exception('分区信息不完善') item_dict.update({ 'partition_info': partition_info_str, }) db_item = models.JobInfo(**item_dict, **{ 'trigger_status': 0, 'create_time': create_time, 'update_time': create_time, 'delete_status': 1, }) db.add(db_item) db.commit() db.refresh(db_item) return db_item def get_job_infos(db: Session, skip: int = 0, limit: int = 20): res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all() # TODO: 排序 return res def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate): db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first() if not db_item: raise Exception('未找到该任务') update_dict = update_item.dict(exclude_unset=True) cron_expression_dict = update_dict.pop('cron_expression') cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict)) cron_select_type = cron_expression_dict["cron_select_type"] update_dict.update({ 'cron_select_type': cron_select_type, 'job_cron': cron_expression, }) partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() and update_dict['partition_info'] != '' else None partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() and update_dict['partition_time'] != '' else None partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() and update_dict['partition_num'] != '' else None partition_info_str = '' if partition_info is not None and partition_time is not None and partition_num is not None: partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time elif partition_info is not None and (partition_time is None or partition_num is None): raise Exception('分区信息不完善') update_dict.update({ 'partition_info': partition_info_str, }) for k, v in update_dict.items(): setattr(db_item, k, v) db_item.update_time = int(time.time()) db.commit() db.flush() db.refresh(db_item) return db_item def update_job_trigger_status(db: Session, id: int, trigger_status: int): db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first() if not db_item: raise Exception('未找到该任务') db_item.trigger_status = trigger_status db_item.update_time = int(time.time()) db.commit() db.flush() db.refresh(db_item) return db_item def get_job_info(db: Session, id: int): db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first() if not db_item: raise Exception('未找到该任务') return db_item def delete_job_info(db: Session, job_id: int): job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first() if not job_item: raise Exception('未找到该任务') if job_item.trigger_status == 1: raise Exception('该任务未停用,不能删除') job_item.delete_status = 0 db.commit() db.flush() db.refresh(job_item) return job_item