1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- from airflow import DAG
- from datetime import datetime
- from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
- from airflow.configuration import conf
- # get the current Kubernetes namespace Airflow is running in
- namespace = conf.get("kubernetes", "NAMESPACE")
- # set the name that will be printed
- name = 'luoyulong'
- python_requirements = ['sphinx>=1.8','sphinx_rtd_theme','recommonmark>=0.6.0','markdown>=3.4.1']
- # instantiate the DAG
- with DAG(
- start_date=datetime(2022,6,1),
- catchup=False,
- schedule_interval='@daily',
- dag_id='lyl_package_test'
- ) as dag:
- package_python_libs1 = KubernetesPodOperator(
- # unique id of the task within the DAG
- task_id='package-task-01',
- # the Docker image to launch
- image='SXKJ:32775/jupyter:1.1',
- image_pull_policy='Always',
- # launch the Pod on the same cluster as Airflow is running on
- in_cluster=True,
- # launch the Pod in the same namespace as Airflow is running in
- namespace=namespace,
- # Pod configuration
- # name the Pod
- name='my_fucking_pod01',
- # give the Pod name a random suffix, ensure uniqueness in the namespace
- random_name_suffix=True,
- # attach labels to the Pod, can be used for grouping
- labels={'app':'backend', 'env':'dev'},
- # reattach to worker instead of creating a new Pod on worker failure
- reattach_on_restart=True,
- # delete Pod after the task is finished
- is_delete_operator_pod=False,
- # get log stdout of the container as task logs
- get_logs=True,
- # log events in case of Pod failure
- log_events_on_failure=True,
- cmds=["/bin/bash", "-c",'sh /home/sxkj/bigdata/install.sh'],
- # pass your name as an environment var
- env_vars={"PYTHON_REQUIREMENTS": ' '.join(python_requirements),
- "UPLOAD_PATH":'/tmp/x.zip'
- }
- )
- package_python_libs2 = KubernetesPodOperator(
- # unique id of the task within the DAG
- task_id='package-task-02',
- # the Docker image to launch
- image='SXKJ:32775/jupyter:1.1',
- image_pull_policy='Always',
- # launch the Pod on the same cluster as Airflow is running on
- in_cluster=True,
- # launch the Pod in the same namespace as Airflow is running in
- namespace=namespace,
- # Pod configuration
- # name the Pod
- name='my_fucking_pod02',
- # give the Pod name a random suffix, ensure uniqueness in the namespace
- random_name_suffix=True,
- # attach labels to the Pod, can be used for grouping
- labels={'app': 'backend', 'env': 'dev'},
- # reattach to worker instead of creating a new Pod on worker failure
- reattach_on_restart=True,
- # delete Pod after the task is finished
- is_delete_operator_pod=False,
- # get log stdout of the container as task logs
- get_logs=True,
- # log events in case of Pod failure
- log_events_on_failure=True,
- cmds=["/bin/bash", "-c", 'echo 12345'],
- # pass your name as an environment var
- env_vars={"PYTHON_REQUIREMENTS": ' '.join(python_requirements),
- "UPLOAD_PATH": '/tmp/x'
- }
- )
- package_python_libs1 >> package_python_libs2
|