1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- from sqlalchemy import Column, Integer, String, Text, JSON
- from app.models.database import BaseModel
- class AirflowJob(BaseModel):
- __tablename__ = "airflow_job"
- id = Column(Integer, primary_key=True, index=True)
- name = Column(Text)
- job_type = Column(Integer) # 任务类型:可取 1=单作业任务 ,2=多作业任务
- job_mode = Column(Integer) # 任务模式:1= 常规模式 2=调试模式
- tasks = Column(JSON) # 任务列表
- dependence = Column(JSON) # 作业间的依赖
- # 任务执行CRON
- cron = Column(Text, nullable=False)
- # 任务描述
- desc = Column(Text, nullable=False)
- # 路由策略: 对应 KubernetesPodOperator schedulername =》通过pod传给k8s集群(schedulername)
- route_strategy = Column(String(32))
- # 阻塞处理策略
- block_strategy = Column(String(32))
- # 任务超时时间, 单位分钟:对应 airflow dag的 dagrun_timeout参数
- executor_timeout = Column(Integer, nullable=False)
- # 失败重试次数
- executor_fail_retry_count = Column(Integer, nullable=False)
- # 调度状态: 0-停止 1-运行
- trigger_status = Column(Integer, nullable=False)
- # 上次调度时间
- trigger_last_time = Column(Integer)
- # 下次调度时间
- trigger_next_time = Column(Integer)
- create_time = Column(Integer) # 创建时间
- update_time = Column(Integer) # 创建时间
- user_id = Column(Integer) # 创建者user id
- def __int__(self):
- pass
- def get_job_path(self):
- dag_path = '/dags/'
- return dag_path + f'dag_{self.id}.py'
|