livy_demo.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. from airflow import DAG
  2. from datetime import datetime
  3. from airflow.configuration import conf
  4. from airflow.decorators import task
  5. import random
  6. import requests
  7. import time
  8. # get the current Kubernetes namespace Airflow is running in
  9. namespace = conf.get("kubernetes", "NAMESPACE")
  10. # instantiate the DAG
  11. with DAG(
  12. start_date=datetime(2022,6,1),
  13. catchup=False,
  14. schedule_interval='@daily',
  15. dag_id='live_demo_dag'
  16. ) as dag:
  17. @task
  18. def submit_session():
  19. # simulating querying from a database
  20. r = requests.post("http://192.168.199.109:8998/sessions", json={"name": "livy-demo"})
  21. return r.json()["id"]
  22. @task
  23. def submit_task(**context):
  24. time.sleep(10)
  25. sess_id = context['ti'].xcom_pull(task_ids='submit_session', key='return_value')
  26. r = requests.post(f"http://192.168.199.109:8998/sessions/{sess_id}/statements", json={
  27. "kind": "pyspark",
  28. "code": "spark.range(1000 * 1000 * 1000).count()"
  29. })
  30. return sess_id
  31. @task
  32. def get_result(**context):
  33. time.sleep(10)
  34. sess_id = context['ti'].xcom_pull(task_ids='submit_task', key='return_value')
  35. r = requests.get(f"http://192.168.199.109:8998/sessions/{sess_id}/statements")
  36. print(r.json())
  37. return r.text
  38. # set dependencies (tasks defined using Decorators need to be called)
  39. submit_session() >> submit_task() >> get_result()