1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from airflow import DAG
- from airflow.utils.dates import days_ago
- args = {
- 'project_id' : '{{ pipeline_name }}',
- }
- dag = DAG(
- '{{ pipeline_name }}',
- default_args=args,
- schedule_interval='@once',
- start_date=days_ago(1),
- description="""
- {{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
- """,
- is_paused_upon_creation={{ is_paused_upon_creation }},
- )
- {{ render_secrets_for_cos(cos_secret) }}
- {% for key, operation in operations_list.items() %}
- {% if not operation.is_generic_operator %}
- {% for import_statement in operation.imports %}
- {{import_statement}}
- {% endfor %}
- {% else %}
- {{ render_secrets_for_generic_op(operation) }}
- {{ render_volumes_for_generic_op(operation) }}
- from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
- {% endif %}
- {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}
- {% endif %}
- {% if not operation.is_generic_operator %}
- op_{{ operation.id|regex_replace }} = {{ operation.class_name }}(
- task_id='{{ operation.notebook|regex_replace }}',
- {% for param, value in operation.component_params.items() %}
- {{ param }}={{ value }},
- {% endfor %}
- {% if operation.volumes or operation.kubernetes_tolerations or operation.kubernetes_pod_annotations %}
- executor_config={{ render_executor_config_for_custom_op(operation) }},
- {% endif %}
- {% else %}
- op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.notebook|regex_replace }}',
- namespace='{{ user_namespace }}',
- image='{{ operation.runtime_image }}',
- {% if operation.runtime_image_pull_secret %}
- image_pull_secrets='{{ operation.runtime_image_pull_secret }}',
- {% endif %}
- cmds=['sh', '-c'],
- arguments=["{{ operation.argument_list }}"],
- task_id='{{ operation.notebook|regex_replace }}',
- env_vars={{ operation.pipeline_envs }},
- {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
- resources = {
- {% if operation.cpu_request %}
- 'request_cpu': '{{ operation.cpu_request }}',
- {% endif %}
- {% if operation.mem_request %}
- 'request_memory': '{{ operation.mem_request }}',
- {% endif %}
- {% if operation.gpu_limit %}
- 'limit_gpu': '{{ operation.gpu_limit }}',
- {% endif %}
- },
- {% endif %}
- {% if operation.secrets or cos_secret %}
- secrets=[{% if operation.secrets %}{% for secret_var in operation.secret_vars %}{{ secret_var }},{% endfor %}{% endif %}{% if cos_secret %}env_var_secret_id, env_var_secret_key{% endif %}],
- {% endif %}
- {% if operation.volumes %}
- volumes=[{% for volume_var in operation.volume_vars %}{{ volume_var }},{% endfor %}],
- volume_mounts=[{% for mount_var in operation.volume_mount_vars %}{{ mount_var }},{% endfor %}],
- {% endif %}
- {% if operation.kubernetes_tolerations or operation.kubernetes_pod_annotations %}
- executor_config={{ render_executor_config_for_generic_op(operation) }},
- {% endif %}
- in_cluster={{ in_cluster }},
- config_file="{{ kube_config_path }}",
- {% endif %}
- dag=dag)
- {% if operation.image_pull_policy %}
- op_{{ operation.id|regex_replace }}.image_pull_policy = '{{ operation.image_pull_policy }}'
- {% endif %}
- {% if operation.doc %}
- op_{{ operation.id|regex_replace }}.doc = """
- {{ operation.doc|replace("\"\"\"", "\\\"\\\"\\\"") }}
- """
- {% endif %}
- {% if operation.parent_operation_ids %}
- {% for parent_operation_id in operation.parent_operation_ids %}
- op_{{ operation.id|regex_replace }} << op_{{ parent_operation_id|regex_replace }}
- {% endfor %}
- {% endif %}
- {% endfor %}
|