from sqlalchemy import Column, Integer, String, Text, JSON from app.models.database import BaseModel class AirflowJob(BaseModel): __tablename__ = "airflow_job" id = Column(Integer, primary_key=True, index=True) name = Column(Text) job_type = Column(Integer) # 任务类型:可取 1=单作业任务 ,2=多作业任务 job_mode = Column(Integer) # 任务模式:1= 常规模式 2=调试模式 tasks = Column(JSON) # 任务列表 dependence = Column(JSON) # 作业间的依赖 # 任务执行CRON cron = Column(Text, nullable=False) # 任务描述 desc = Column(Text, nullable=False) # 路由策略: 对应 KubernetesPodOperator schedulername =》通过pod传给k8s集群(schedulername) route_strategy = Column(String(32)) # 阻塞处理策略 block_strategy = Column(String(32)) # 任务超时时间, 单位分钟:对应 airflow dag的 dagrun_timeout参数 executor_timeout = Column(Integer, nullable=False) # 失败重试次数 executor_fail_retry_count = Column(Integer, nullable=False) # 调度状态: 0-停止 1-运行 trigger_status = Column(Integer, nullable=False) # 上次调度时间 trigger_last_time = Column(Integer) # 下次调度时间 trigger_next_time = Column(Integer) create_time = Column(Integer) # 创建时间 update_time = Column(Integer) # 创建时间 user_id = Column(Integer) # 创建者user id def __int__(self): pass def get_job_path(self): dag_path = '/dags/' return dag_path + f'dag_{self.id}.py'