1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 |
- from airflow import DAG
- from datetime import datetime
- from airflow.configuration import conf
- from airflow.decorators import task
- import random
- import requests
- import time
- # get the current Kubernetes namespace Airflow is running in
- namespace = conf.get("kubernetes", "NAMESPACE")
- # instantiate the DAG
- with DAG(
- start_date=datetime(2022,6,1),
- catchup=False,
- schedule_interval='@daily',
- dag_id='live_demo_dag'
- ) as dag:
- @task
- def submit_session():
- # simulating querying from a database
- r = requests.post("http://192.168.199.109:8998/sessions", json={"name": "livy-demo"})
- return r.json()["id"]
- @task
- def submit_task(**context):
- time.sleep(10)
- sess_id = context['ti'].xcom_pull(task_ids='submit_session', key='return_value')
- r = requests.post(f"http://192.168.199.109:8998/sessions/{sess_id}/statements", json={
- "kind": "pyspark",
- "code": "spark.range(1000 * 1000 * 1000).count()"
- })
- return sess_id
- @task
- def get_result(**context):
- time.sleep(10)
- sess_id = context['ti'].xcom_pull(task_ids='submit_task', key='return_value')
- r = requests.get(f"http://192.168.199.109:8998/sessions/{sess_id}/statements")
- print(r.json())
- return r.text
- # set dependencies (tasks defined using Decorators need to be called)
- submit_session() >> submit_task() >> get_result()
|