123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- from airflow import DAG
- from airflow.utils.dates import days_ago
- {% if cos_secret %}
- from airflow.kubernetes.secret import Secret
- {% endif %}
- args = {
- 'project_id' : '{{ pipeline_name }}',
- }
- dag = DAG(
- '{{ pipeline_name }}',
- default_args=args,
- schedule_interval='@once',
- start_date=days_ago(1),
- description="""
- {{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
- """,
- is_paused_upon_creation={{ is_paused_upon_creation }},
- )
- {% if cos_secret %}
- ## Ensure that the secret named '{{ cos_secret }}' is defined in the Kubernetes namespace where this pipeline will be run
- env_var_secret_id = Secret(deploy_type='env',
- deploy_target='AWS_ACCESS_KEY_ID',
- secret='{{ cos_secret }}',
- key='AWS_ACCESS_KEY_ID',
- )
- env_var_secret_key = Secret(deploy_type='env',
- deploy_target='AWS_SECRET_ACCESS_KEY',
- secret='{{ cos_secret }}',
- key='AWS_SECRET_ACCESS_KEY',
- )
- {% endif %}
- {% set ns = namespace() %}
- {% for key, operation in operations_list.items() %}
- {% set ns.operation_kubernetes_secrets = "" %}
- {% if not operation.is_generic_operator %}
- {% for import_statement in operation.imports %}
- {{import_statement}}
- {% endfor %}
- {% else %}
- from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
- {% endif %}
- {% if operation.kubernetes_secrets %}
- from airflow.kubernetes.secret import Secret
- # Kubernetes secrets for operation '{{ operation.id|regex_replace }}'
- {% for secret in operation.kubernetes_secrets %}
- secret_{{ operation.id|regex_replace }}_{{ loop.index }} = Secret(deploy_type='env',
- deploy_target='{{ secret.env_var }}',
- secret='{{ secret.name }}',
- key='{{ secret.key }}',
- )
- {% set ns.operation_kubernetes_secrets = ns.operation_kubernetes_secrets ~ 'secret_' ~ operation.id|regex_replace ~ '_' ~ loop.index ~ ', ' %}
- {% endfor %}
- {% endif %}
- {% if cos_secret %}{% set ns.operation_kubernetes_secrets = ns.operation_kubernetes_secrets ~ "env_var_secret_id, env_var_secret_key" %}{% endif %}
- {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}{% if not operation.is_generic_operator %}
- op_{{ operation.id|regex_replace }} = {{ operation.class_name }}(
- task_id='{{ operation.notebook|regex_replace }}',
- {% for param, value in operation.component_params.items() %}
- {{ param }}={{ value }},
- {% endfor %}
- {% else %}
- {% if operation.volume_mounts %}
- from airflow.contrib.kubernetes.volume import Volume
- from airflow.contrib.kubernetes.volume_mount import VolumeMount
- volumes_{{ operation.id|regex_replace }} = []
- volume_mounts_{{ operation.id|regex_replace }} = []
- {% for volume_mount in operation.volume_mounts %}
- volume_mount_{{loop.index}} = VolumeMount(name='{{ volume_mount.pvc_name }}',
- mount_path='{{ volume_mount.path }}',
- sub_path=None,
- read_only=False)
- volume_config_{{ loop.index }}= {
- 'persistentVolumeClaim':
- {
- 'claimName': '{{ volume_mount.pvc_name }}'
- }
- }
- volume_{{ loop.index }} = Volume(name='{{ volume_mount.pvc_name }}', configs=volume_config_{{ loop.index }})
- volumes_{{ operation.id|regex_replace }}.append(volume_{{ loop.index }})
- volume_mounts_{{ operation.id|regex_replace }}.append(volume_mount_{{ loop.index }})
- {% endfor %}
- {% endif %}
- op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.notebook|regex_replace }}',
- namespace='{{ user_namespace }}',
- image='{{ operation.runtime_image }}',
- {% if operation.runtime_image_pull_secret %}
- image_pull_secrets='{{ operation.runtime_image_pull_secret }}',
- {% endif %}
- cmds=['sh', '-c'],
- arguments=["{{ operation.argument_list }}"],
- task_id='{{ operation.notebook|regex_replace }}',
- env_vars={{ operation.pipeline_envs }},
- {% if operation.volume_mounts %}
- volumes=volumes_{{ operation.id|regex_replace }},
- volume_mounts=volume_mounts_{{ operation.id|regex_replace }},
- {% endif %}
- {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
- resources = {
- {% if operation.cpu_request %}
- 'request_cpu': '{{ operation.cpu_request }}',
- {% endif %}
- {% if operation.mem_request %}
- 'request_memory': '{{ operation.mem_request }}',
- {% endif %}
- {% if operation.gpu_limit %}
- 'limit_gpu': '{{ operation.gpu_limit }}',
- {% endif %}
- },
- {% endif %}
- {% if ns.operation_kubernetes_secrets %}
- secrets=[{{ ns.operation_kubernetes_secrets }}],
- {% endif %}
- in_cluster={{ in_cluster }},
- config_file="{{ kube_config_path }}",
- {% endif %}
- dag=dag)
- {% if operation.image_pull_policy %}
- op_{{ operation.id|regex_replace }}.image_pull_policy = '{{ operation.image_pull_policy }}'
- {% endif %}
- {% if operation.doc %}
- op_{{ operation.id|regex_replace }}.doc = """
- {{ operation.doc|replace("\"\"\"", "\\\"\\\"\\\"") }}
- """
- {% endif %}
- {% if operation.parent_operation_ids %}
- {% for parent_operation_id in operation.parent_operation_ids %}
- op_{{ operation.id|regex_replace }} << op_{{ parent_operation_id|regex_replace }}
- {% endfor %}
- {% endif %}
- {% endfor %}
|