single_task.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.operators.bash import BashOperator
  4. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
  5. from airflow.configuration import conf
  6. namespace = conf.get("kubernetes", "NAMESPACE")
  7. # set the name that will be printed
  8. name = "luoyulong"
  9. # instantiate the DAG
  10. with DAG(
  11. start_date=datetime(2022,6,1),
  12. catchup=False,
  13. schedule_interval='@daily',
  14. dag_id="single_task"
  15. ) as dag:
  16. op_6aeed233_0951_4637_99e8_4736262edc94 = KubernetesPodOperator(
  17. task_id="6aeed233-0951-4637-99e8-4736262edc94",
  18. image="SXKJ:32775/pod_python:1.1",
  19. in_cluster=True,
  20. namespace=namespace,
  21. name="PythonNode1",
  22. random_name_suffix=True,
  23. labels={'app':'backend', 'env':'dev'},
  24. reattach_on_restart=True,
  25. is_delete_operator_pod=False,
  26. get_logs=True,
  27. log_events_on_failure=True,
  28. cmds=['/bin/bash', '-c', 'echo -e "$SCRIPT" > run.py && python run.py '],
  29. env_vars={'SCRIPT': "import numpy as np\nimport os\n\n\ndef hello():\n for i in range(10):\n print(f'{i}: ok!')\n print(os.environ)\n\n\nif __name__ == '__main__':\n hello()\n"}
  30. )