from airflow import DAG from datetime import datetime from airflow.configuration import conf from airflow.decorators import task import random import requests import time # get the current Kubernetes namespace Airflow is running in namespace = conf.get("kubernetes", "NAMESPACE") # instantiate the DAG with DAG( start_date=datetime(2022,6,1), catchup=False, schedule_interval='@daily', dag_id='live_demo_dag' ) as dag: @task def submit_session(): # simulating querying from a database r = requests.post("http://192.168.199.109:8998/sessions", json={"name": "livy-demo"}) return r.json()["id"] @task def submit_task(**context): time.sleep(10) sess_id = context['ti'].xcom_pull(task_ids='submit_session', key='return_value') r = requests.post(f"http://192.168.199.109:8998/sessions/{sess_id}/statements", json={ "kind": "pyspark", "code": "spark.range(1000 * 1000 * 1000).count()" }) return sess_id @task def get_result(**context): time.sleep(10) sess_id = context['ti'].xcom_pull(task_ids='submit_task', key='return_value') r = requests.get(f"http://192.168.199.109:8998/sessions/{sess_id}/statements") print(r.json()) return r.text # set dependencies (tasks defined using Decorators need to be called) submit_session() >> submit_task() >> get_result()