af_job.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. from sqlalchemy import Column, Integer, String, Text, JSON
  2. from app.models.database import BaseModel
  3. class AirflowJob(BaseModel):
  4. __tablename__ = "airflow_job"
  5. id = Column(Integer, primary_key=True, index=True)
  6. name = Column(Text)
  7. job_type = Column(Integer) # 任务类型:可取 1=单作业任务 ,2=多作业任务
  8. job_mode = Column(Integer) # 任务模式:1= 常规模式 2=调试模式
  9. tasks = Column(JSON) # 任务列表
  10. dependence = Column(JSON) # 作业间的依赖
  11. # 任务执行CRON
  12. cron = Column(Text, nullable=False)
  13. # 任务描述
  14. desc = Column(Text, nullable=False)
  15. # 路由策略: 对应 KubernetesPodOperator schedulername =》通过pod传给k8s集群(schedulername)
  16. route_strategy = Column(String(32))
  17. # 阻塞处理策略
  18. block_strategy = Column(String(32))
  19. # 任务超时时间, 单位分钟:对应 airflow dag的 dagrun_timeout参数
  20. executor_timeout = Column(Integer, nullable=False)
  21. # 失败重试次数
  22. executor_fail_retry_count = Column(Integer, nullable=False)
  23. # 调度状态: 0-停止 1-运行
  24. trigger_status = Column(Integer, nullable=False)
  25. # 上次调度时间
  26. trigger_last_time = Column(Integer)
  27. # 下次调度时间
  28. trigger_next_time = Column(Integer)
  29. create_time = Column(Integer) # 创建时间
  30. update_time = Column(Integer) # 创建时间
  31. user_id = Column(Integer) # 创建者user id
  32. def __int__(self):
  33. pass
  34. def get_job_path(self):
  35. dag_path = '/dags/'
  36. return dag_path + f'dag_{self.id}.py'