airflow_template.jinja2 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. from airflow import DAG
  2. from airflow.utils.dates import days_ago
  3. {% if cos_secret %}
  4. from airflow.kubernetes.secret import Secret
  5. {% endif %}
  6. args = {
  7. 'project_id' : '{{ pipeline_name }}',
  8. }
  9. dag = DAG(
  10. '{{ pipeline_name }}',
  11. default_args=args,
  12. schedule_interval='@once',
  13. start_date=days_ago(1),
  14. description="""
  15. {{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
  16. """,
  17. is_paused_upon_creation={{ is_paused_upon_creation }},
  18. )
  19. {% if cos_secret %}
  20. ## Ensure that the secret named '{{ cos_secret }}' is defined in the Kubernetes namespace where this pipeline will be run
  21. env_var_secret_id = Secret(deploy_type='env',
  22. deploy_target='AWS_ACCESS_KEY_ID',
  23. secret='{{ cos_secret }}',
  24. key='AWS_ACCESS_KEY_ID',
  25. )
  26. env_var_secret_key = Secret(deploy_type='env',
  27. deploy_target='AWS_SECRET_ACCESS_KEY',
  28. secret='{{ cos_secret }}',
  29. key='AWS_SECRET_ACCESS_KEY',
  30. )
  31. {% endif %}
  32. {% set ns = namespace() %}
  33. {% for key, operation in operations_list.items() %}
  34. {% set ns.operation_kubernetes_secrets = "" %}
  35. {% if not operation.is_generic_operator %}
  36. {% for import_statement in operation.imports %}
  37. {{import_statement}}
  38. {% endfor %}
  39. {% else %}
  40. from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
  41. {% endif %}
  42. {% if operation.kubernetes_secrets %}
  43. from airflow.kubernetes.secret import Secret
  44. # Kubernetes secrets for operation '{{ operation.id|regex_replace }}'
  45. {% for secret in operation.kubernetes_secrets %}
  46. secret_{{ operation.id|regex_replace }}_{{ loop.index }} = Secret(deploy_type='env',
  47. deploy_target='{{ secret.env_var }}',
  48. secret='{{ secret.name }}',
  49. key='{{ secret.key }}',
  50. )
  51. {% set ns.operation_kubernetes_secrets = ns.operation_kubernetes_secrets ~ 'secret_' ~ operation.id|regex_replace ~ '_' ~ loop.index ~ ', ' %}
  52. {% endfor %}
  53. {% endif %}
  54. {% if cos_secret %}{% set ns.operation_kubernetes_secrets = ns.operation_kubernetes_secrets ~ "env_var_secret_id, env_var_secret_key" %}{% endif %}
  55. {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}{% endif %}{% if not operation.is_generic_operator %}
  56. op_{{ operation.id|regex_replace }} = {{ operation.class_name }}(
  57. task_id='{{ operation.notebook|regex_replace }}',
  58. {% for param, value in operation.component_params.items() %}
  59. {{ param }}={{ value }},
  60. {% endfor %}
  61. {% else %}
  62. {% if operation.volume_mounts %}
  63. from airflow.contrib.kubernetes.volume import Volume
  64. from airflow.contrib.kubernetes.volume_mount import VolumeMount
  65. volumes_{{ operation.id|regex_replace }} = []
  66. volume_mounts_{{ operation.id|regex_replace }} = []
  67. {% for volume_mount in operation.volume_mounts %}
  68. volume_mount_{{loop.index}} = VolumeMount(name='{{ volume_mount.pvc_name }}',
  69. mount_path='{{ volume_mount.path }}',
  70. sub_path=None,
  71. read_only=False)
  72. volume_config_{{ loop.index }}= {
  73. 'persistentVolumeClaim':
  74. {
  75. 'claimName': '{{ volume_mount.pvc_name }}'
  76. }
  77. }
  78. volume_{{ loop.index }} = Volume(name='{{ volume_mount.pvc_name }}', configs=volume_config_{{ loop.index }})
  79. volumes_{{ operation.id|regex_replace }}.append(volume_{{ loop.index }})
  80. volume_mounts_{{ operation.id|regex_replace }}.append(volume_mount_{{ loop.index }})
  81. {% endfor %}
  82. {% endif %}
  83. op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.notebook|regex_replace }}',
  84. namespace='{{ user_namespace }}',
  85. image='{{ operation.runtime_image }}',
  86. {% if operation.runtime_image_pull_secret %}
  87. image_pull_secrets='{{ operation.runtime_image_pull_secret }}',
  88. {% endif %}
  89. cmds=['sh', '-c'],
  90. arguments=["{{ operation.argument_list }}"],
  91. task_id='{{ operation.notebook|regex_replace }}',
  92. env_vars={{ operation.pipeline_envs }},
  93. {% if operation.volume_mounts %}
  94. volumes=volumes_{{ operation.id|regex_replace }},
  95. volume_mounts=volume_mounts_{{ operation.id|regex_replace }},
  96. {% endif %}
  97. {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
  98. resources = {
  99. {% if operation.cpu_request %}
  100. 'request_cpu': '{{ operation.cpu_request }}',
  101. {% endif %}
  102. {% if operation.mem_request %}
  103. 'request_memory': '{{ operation.mem_request }}',
  104. {% endif %}
  105. {% if operation.gpu_limit %}
  106. 'limit_gpu': '{{ operation.gpu_limit }}',
  107. {% endif %}
  108. },
  109. {% endif %}
  110. {% if ns.operation_kubernetes_secrets %}
  111. secrets=[{{ ns.operation_kubernetes_secrets }}],
  112. {% endif %}
  113. in_cluster={{ in_cluster }},
  114. config_file="{{ kube_config_path }}",
  115. {% endif %}
  116. dag=dag)
  117. {% if operation.image_pull_policy %}
  118. op_{{ operation.id|regex_replace }}.image_pull_policy = '{{ operation.image_pull_policy }}'
  119. {% endif %}
  120. {% if operation.doc %}
  121. op_{{ operation.id|regex_replace }}.doc = """
  122. {{ operation.doc|replace("\"\"\"", "\\\"\\\"\\\"") }}
  123. """
  124. {% endif %}
  125. {% if operation.parent_operation_ids %}
  126. {% for parent_operation_id in operation.parent_operation_ids %}
  127. op_{{ operation.id|regex_replace }} << op_{{ parent_operation_id|regex_replace }}
  128. {% endfor %}
  129. {% endif %}
  130. {% endfor %}