spark_dag.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. import os
  2. from datetime import datetime, timedelta
  3. # [START import_module]
  4. # The DAG object; we'll need this to instantiate a DAG
  5. from airflow import DAG
  6. # Operators; we need this to operate!
  7. from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
  8. from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
  9. from airflow.configuration import conf
  10. # [END import_module]
  11. # [START instantiate_dag]
  12. ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
  13. DAG_ID = "spark_pi"
  14. # get the current Kubernetes namespace Airflow is running in
  15. namespace = conf.get("kubernetes", "NAMESPACE")
  16. with DAG(
  17. DAG_ID,
  18. default_args={'max_active_runs': 1},
  19. description='submit spark-pi as sparkApplication on kubernetes',
  20. schedule_interval=timedelta(days=1),
  21. start_date=datetime(2021, 1, 1),
  22. catchup=False,
  23. ) as dag:
  24. # [START SparkKubernetesOperator_DAG]
  25. t1 = SparkKubernetesOperator(
  26. task_id='spark_pi_submit',
  27. namespace=namespace,
  28. application_file="example_spark_kubernetes_spark_pi.yaml",
  29. do_xcom_push=True,
  30. dag=dag,
  31. )
  32. t2 = SparkKubernetesSensor(
  33. task_id='spark_pi_monitor',
  34. namespace=namespace,
  35. application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
  36. dag=dag,
  37. )
  38. t1 >> t2