123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.utils.cron_utils import *
- def create_job_info(db: Session, item: schemas.JobInfoCreate):
- create_time: int = int(time.time())
- item_dict = item.dict()
- cron_expression_dict = item_dict.pop('cron_expression')
- cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
- cron_select_type = cron_expression_dict["cron_select_type"]
- item_dict.update({
- 'cron_select_type': cron_select_type,
- 'job_cron': cron_expression,
- })
- partition_info = item_dict.pop('partition_info') if "partition_info" in item_dict.keys() else None
- partition_time = item_dict.pop('partition_time') if "partition_time" in item_dict.keys() else None
- partition_num = item_dict.pop('partition_num') if "partition_num" in item_dict.keys() else None
- partition_info_str = ''
- if partition_info is not None and partition_time is not None and partition_num is not None:
- partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
- elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
- raise Exception('分区信息不完善')
- item_dict.update({
- 'partition_info': partition_info_str,
- })
- db_item = models.JobInfo(**item_dict, **{
- 'trigger_status': 0,
- 'create_time': create_time,
- 'update_time': create_time,
- 'delete_status': 1,
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def get_job_infos(db: Session, skip: int = 0, limit: int = 20):
- res: List[models.JobInfo] = db.query(models.JobInfo).filter(models.JobInfo.delete_status == 1).all() # TODO: 排序
- return res
- def update_job_info(db: Session, id: int, update_item: schemas.JobInfoUpdate):
- db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
- if not db_item:
- raise Exception('未找到该任务')
- update_dict = update_item.dict(exclude_unset=True)
- cron_expression_dict = update_dict.pop('cron_expression')
- cron_expression = joint_cron_expression(schemas.CronExpression(**cron_expression_dict))
- cron_select_type = cron_expression_dict["cron_select_type"]
- update_dict.update({
- 'cron_select_type': cron_select_type,
- 'job_cron': cron_expression,
- })
- partition_info = update_dict.pop('partition_info') if "partition_info" in update_dict.keys() else None
- partition_time = update_dict.pop('partition_time') if "partition_time" in update_dict.keys() else None
- partition_num = update_dict.pop('partition_num') if "partition_num" in update_dict.keys() else None
- partition_info_str = ''
- if partition_info is not None and partition_time is not None and partition_num is not None:
- partition_info_str += partition_info + ',' + str(partition_num) + ',' + partition_time
- elif (partition_info is not None or partition_info != '') and (partition_time is None or partition_num is None):
- raise Exception('分区信息不完善')
- update_dict.update({
- 'partition_info': partition_info_str,
- })
- for k, v in update_dict.items():
- setattr(db_item, k, v)
- db_item.update_time = int(time.time())
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def update_job_trigger_status(db: Session, id: int, trigger_status: int):
- db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
- if not db_item:
- raise Exception('未找到该任务')
- db_item.trigger_status = trigger_status
- db_item.update_time = int(time.time())
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def get_job_info(db: Session, id: int):
- db_item = db.query(models.JobInfo).filter(models.JobInfo.id == id).first()
- if not db_item:
- raise Exception('未找到该任务')
- return db_item
- def delete_job_info(db: Session, job_id: int):
- job_item = db.query(models.JobInfo).filter(models.JobInfo.id == job_id).first()
- if not job_item:
- raise Exception('未找到该任务')
- if job_item.trigger_status == 1:
- raise Exception('该任务未停用,不能删除')
- job_item.delete_status = 0
- db.commit()
- db.flush()
- db.refresh(job_item)
- return job_item
|