airflow_template.jinja2 4.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from airflow import DAG
  2. from airflow.utils.dates import days_ago
  3. args = {
  4. 'project_id' : '{{ pipeline_name }}',
  5. }
  6. dag = DAG(
  7. '{{ pipeline_name }}',
  8. default_args=args,
  9. schedule_interval='@once',
  10. start_date=days_ago(1),
  11. description="""
  12. {{ pipeline_description|replace("\"\"\"", "\\\"\\\"\\\"") }}
  13. """,
  14. is_paused_upon_creation={{ is_paused_upon_creation }},
  15. )
  16. {{ render_secrets_for_cos(cos_secret) }}
  17. {% for key, operation in operations_list.items() %}
  18. {% if not operation.is_generic_operator %}
  19. {% for import_statement in operation.imports %}
  20. {{import_statement}}
  21. {% endfor %}
  22. {% else %}
  23. {{ render_secrets_for_generic_op(operation) }}
  24. {{ render_volumes_for_generic_op(operation) }}
  25. from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
  26. {% endif %}
  27. {% if operation.operator_source %}# Operator source: {{ operation.operator_source }}
  28. {% endif %}
  29. {% if not operation.is_generic_operator %}
  30. op_{{ operation.id|regex_replace }} = {{ operation.class_name }}(
  31. task_id='{{ operation.notebook|regex_replace }}',
  32. {% for param, value in operation.component_params.items() %}
  33. {{ param }}={{ value }},
  34. {% endfor %}
  35. {% if operation.volumes or operation.kubernetes_tolerations or operation.kubernetes_pod_annotations %}
  36. executor_config={{ render_executor_config_for_custom_op(operation) }},
  37. {% endif %}
  38. {% else %}
  39. op_{{ operation.id|regex_replace }} = KubernetesPodOperator(name='{{ operation.notebook|regex_replace }}',
  40. namespace='{{ user_namespace }}',
  41. image='{{ operation.runtime_image }}',
  42. {% if operation.runtime_image_pull_secret %}
  43. image_pull_secrets='{{ operation.runtime_image_pull_secret }}',
  44. {% endif %}
  45. cmds=['sh', '-c'],
  46. arguments=["{{ operation.argument_list }}"],
  47. task_id='{{ operation.notebook|regex_replace }}',
  48. env_vars={{ operation.pipeline_envs }},
  49. {% if operation.cpu_request or operation.mem_request or operation.gpu_limit %}
  50. resources = {
  51. {% if operation.cpu_request %}
  52. 'request_cpu': '{{ operation.cpu_request }}',
  53. {% endif %}
  54. {% if operation.mem_request %}
  55. 'request_memory': '{{ operation.mem_request }}',
  56. {% endif %}
  57. {% if operation.gpu_limit %}
  58. 'limit_gpu': '{{ operation.gpu_limit }}',
  59. {% endif %}
  60. },
  61. {% endif %}
  62. {% if operation.secrets or cos_secret %}
  63. secrets=[{% if operation.secrets %}{% for secret_var in operation.secret_vars %}{{ secret_var }},{% endfor %}{% endif %}{% if cos_secret %}env_var_secret_id, env_var_secret_key{% endif %}],
  64. {% endif %}
  65. {% if operation.volumes %}
  66. volumes=[{% for volume_var in operation.volume_vars %}{{ volume_var }},{% endfor %}],
  67. volume_mounts=[{% for mount_var in operation.volume_mount_vars %}{{ mount_var }},{% endfor %}],
  68. {% endif %}
  69. {% if operation.kubernetes_tolerations or operation.kubernetes_pod_annotations %}
  70. executor_config={{ render_executor_config_for_generic_op(operation) }},
  71. {% endif %}
  72. in_cluster={{ in_cluster }},
  73. config_file="{{ kube_config_path }}",
  74. {% endif %}
  75. dag=dag)
  76. {% if operation.image_pull_policy %}
  77. op_{{ operation.id|regex_replace }}.image_pull_policy = '{{ operation.image_pull_policy }}'
  78. {% endif %}
  79. {% if operation.doc %}
  80. op_{{ operation.id|regex_replace }}.doc = """
  81. {{ operation.doc|replace("\"\"\"", "\\\"\\\"\\\"") }}
  82. """
  83. {% endif %}
  84. {% if operation.parent_operation_ids %}
  85. {% for parent_operation_id in operation.parent_operation_ids %}
  86. op_{{ operation.id|regex_replace }} << op_{{ parent_operation_id|regex_replace }}
  87. {% endfor %}
  88. {% endif %}
  89. {% endfor %}