import time from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import models, page_help, schemas from app.common.decorators import verify_all import app.crud as crud from app.crud import job_info from app.services.datax import datax_create_job, datax_update_job, on_off_control from app.services.job_info import create_job_info_services, execute_job_services, update_job_info_services from app.utils.cron_utils import parsing_cron_expression from app.utils.send_util import * from app.utils.utils import * from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db, get_page router = APIRouter( prefix="/jpt/jobinfo", tags=["jobinfo-任务管理"], ) @router.post("/", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)): return create_job_info_services(db,item) @router.get("/", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def get_job_infos(params: Params=Depends(get_page), db: Session = Depends(get_db)): return page_help(crud.get_job_infos(db),params['page'],params['size']) @router.get("/info", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def get_job_info(job_id: int, db: Session = Depends(get_db)): job_info = crud.get_job_info(db, job_id) job_info_dict = job_info.to_dict() cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron'] cron_expression_dict = parsing_cron_expression(cron_expression) cron_expression_dict.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression }) job_info_dict.update({ 'cron_expression_dict': cron_expression_dict }) partition_info_str = job_info_dict.pop('partition_info', None) if partition_info_str: partition_list = partition_info_str.split(',') job_info_dict.update({ 'partition_info': partition_list[0], 'partition_time': partition_list[2], 'partition_num': int(partition_list[1]) }) return job_info_dict @router.put("/{id}", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)): return update_job_info_services(db, id, update_item) @router.put("/update_trigger_status/", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)): job_info: models.JobInfo = crud.get_job_info(db, item.id) relation = crud.get_af_id(db, job_info.id, 'datax') job_info.trigger_status = item.trigger_status on_off_control(relation.af_id, item.trigger_status) job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status) return job_info @router.delete("/{job_id}", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def delete_job_info(job_id: int, db: Session = Depends(get_db)): jm_job: models.JobInfo = crud.get_job_info(db, job_id) if jm_job.trigger_status == 1: raise Exception('任务未停用,不可删除') relation = crud.get_af_id(db, job_id, 'datax') send_delete('/af/af_job', relation.af_id) return crud.delete_job_info(db, job_id) @router.post("/execute", dependencies=[Depends(verify_all)]) @web_try() @sxtimeit def execute_job_info(job_id: int, db: Session = Depends(get_db)): jm_job: models.JobInfo = crud.get_job_info(db, job_id) if jm_job.trigger_status == 0: raise Exception('任务已被停用') res = execute_job_services(db, job_id) return res['data'] add_pagination(router)