kpo_demo.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
  4. KubernetesPodOperator)
  5. from airflow.configuration import conf
  6. # get the current Kubernetes namespace Airflow is running in
  7. namespace = conf.get("kubernetes", "NAMESPACE")
  8. # set the name that will be printed
  9. name = 'zhangli'
  10. # instantiate the DAG
  11. with DAG(
  12. start_date=datetime(2022,6,1),
  13. catchup=False,
  14. schedule_interval='@daily',
  15. dag_id='KPO_different_language_example_dag'
  16. ) as dag:
  17. say_hello_name_in_haskell = KubernetesPodOperator(
  18. # unique id of the task within the DAG
  19. task_id='say_hello_name_in_go',
  20. # the Docker image to launch
  21. image='registry.cn-hangzhou.aliyuncs.com/sxtest/haskell-ex:1.0.0',
  22. # launch the Pod on the same cluster as Airflow is running on
  23. in_cluster=True,
  24. # launch the Pod in the same namespace as Airflow is running in
  25. namespace=namespace,
  26. # Pod configuration
  27. # name the Pod
  28. name='my_fucking_pod',
  29. # give the Pod name a random suffix, ensure uniqueness in the namespace
  30. random_name_suffix=True,
  31. # attach labels to the Pod, can be used for grouping
  32. labels={'app':'backend', 'env':'dev'},
  33. # reattach to worker instead of creating a new Pod on worker failure
  34. reattach_on_restart=True,
  35. # delete Pod after the task is finished
  36. is_delete_operator_pod=True,
  37. # get log stdout of the container as task logs
  38. get_logs=True,
  39. # log events in case of Pod failure
  40. log_events_on_failure=True,
  41. # pass your name as an environment var
  42. env_vars={"NAME_TO_GREET": f"{name}"}
  43. )