from typing import List, Optional from pydantic import BaseModel from app.schemas.cron_expression import CronExpression from app.schemas.jm_job_node import JmJobEdge, JmJobNode class JmJobInfoBase(BaseModel): # 任务名称 name: str # 任务类型 type: str # 任务分类 tag: str # 周期类型(1:单次;2:循环) cron_type: int # 周期表达式 cron_expression: Optional[CronExpression] = None # 图形信息 json_str : Optional[str] # 用户名称 user_name: str class JmJobInfoCreate(JmJobInfoBase): nodes: List[JmJobNode] edges: Optional[List[JmJobEdge]] = None class Config: schema_extra = { "example": { "name": "example", "type": "单作业离线任务", "tag": "业务预测", "cron_type": "2", "cron_expression": { "cron_select_type": 3, "cron_expression": "", "minute": 0, "hour": 0, "day": 1, "week": 3, "month": 2, }, "json_str": "\{图形信息\}", "user_name": "test", "nodes": [ { "id": 1, "homework_id": 0, "homework_name": "开始", "start_point": 0, }, { "id": 2, "homework_id": 1, "homework_name": "test", "start_point": 1, } ], "edges": [ { "source": 1, "target": 2, } ], } } class JmJobInfoUpdate(JmJobInfoBase): id: int nodes: List[JmJobNode] edges: Optional[List[JmJobEdge]] = None class Config: schema_extra = { "example": { "id": 11, "name": "example", "type": "单作业离线任务", "tag": "业务预测", "cron_type": "2", "cron_expression": { "cron_select_type": 3, "cron_expression": "* * * * *", "minute": 0, "hour": 0, "day": 1, "week": 3, "month": 2, }, "json_str": "\{图形信息\}", "user_name": "test", "nodes": [ { "id": 1, "homework_id": 0, "homework_name": "开始", "start_point": 0, }, { "id": 2, "homework_id": 1, "homework_name": "test", "start_point": 1, } ], "edges": [ { "source": 1, "target": 2, } ], } } class JmJobInfoStatusUpdate(BaseModel): id: int status: int class Config: schema_extra = { "example": { "id": 1, "status": 0, } }