123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from collections import OrderedDict
- from datetime import datetime
- import json
- import os
- import re
- import string
- import tempfile
- import time
- from typing import Dict
- from typing import List
- from typing import Optional
- from typing import Union
- import autopep8
- from jinja2 import Environment
- from jinja2 import PackageLoader
- from traitlets import CUnicode
- from traitlets import List as ListTrait
- from elyra._version import __version__
- from elyra.airflow.operator import BootscriptBuilder
- from elyra.metadata.schemaspaces import RuntimeImages
- from elyra.metadata.schemaspaces import Runtimes
- from elyra.pipeline.component_catalog import ComponentCache
- from elyra.pipeline.pipeline import GenericOperation
- from elyra.pipeline.pipeline import Operation
- from elyra.pipeline.pipeline import Pipeline
- from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
- from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
- from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
- from elyra.pipeline.processor import PipelineProcessor
- from elyra.pipeline.processor import PipelineProcessorResponse
- from elyra.pipeline.processor import RuntimePipelineProcessor
- from elyra.pipeline.runtime_type import RuntimeProcessorType
- from elyra.util.cos import join_paths
- from elyra.util.github import GithubClient
- try:
- from elyra.util.gitlab import GitLabClient
- except ImportError:
- pass # Gitlab package is not installed, ignore and use only GitHub
- from elyra.util.gitutil import SupportedGitTypes # noqa:I202
- from elyra.util.path import get_absolute_path
- class AirflowPipelineProcessor(RuntimePipelineProcessor):
- _type = RuntimeProcessorType.APACHE_AIRFLOW
- _name = "airflow"
- # Provide users with the ability to identify a writable directory in the
- # running container where the notebook | script is executed. The location
- # must exist and be known before the container is started.
- # Defaults to `/tmp`
- WCD = os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp").strip().rstrip("/")
- # This specifies the default airflow operators included with Elyra. Any Airflow-based
- # custom connectors should create/extend the elyra configuration file to include
- # those fully-qualified operator/class names.
- available_airflow_operators = ListTrait(
- CUnicode(),
- [
- "airflow.operators.slack_operator.SlackAPIPostOperator",
- "airflow.operators.bash_operator.BashOperator",
- "airflow.operators.email_operator.EmailOperator",
- "airflow.operators.http_operator.SimpleHttpOperator",
- "airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
- "airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator",
- ],
- help="""List of available Apache Airflow operator names.
- Operators available for use within Apache Airflow pipelines. These operators must
- be fully qualified (i.e., prefixed with their package names).
- """,
- ).tag(config=True)
- # Contains mappings from class to import statement for each available Airflow operator
- class_import_map = {}
- def __init__(self, root_dir, **kwargs):
- super().__init__(root_dir, **kwargs)
- if not self.class_import_map: # Only need to load once
- for package in self.available_airflow_operators:
- parts = package.rsplit(".", 1)
- self.class_import_map[parts[1]] = f"from {parts[0]} import {parts[1]}"
- self.log.debug(f"class_package_map = {self.class_import_map}")
- def process(self, pipeline: Pipeline) -> None:
- """
- Submit the pipeline for execution on Apache Airflow.
- """
- t0_all = time.time()
- timestamp = datetime.now().strftime("%m%d%H%M%S")
- # Create an instance id that will be used to store
- # the pipelines' dependencies, if applicable
- pipeline_instance_id = f"{pipeline.name}-{timestamp}"
- runtime_configuration = self._get_metadata_configuration(
- schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
- )
- api_endpoint = runtime_configuration.metadata.get("api_endpoint")
- cos_endpoint = runtime_configuration.metadata.get("cos_endpoint")
- cos_bucket = runtime_configuration.metadata.get("cos_bucket")
- git_type = SupportedGitTypes.get_instance_by_name(
- runtime_configuration.metadata.get("git_type", SupportedGitTypes.GITHUB.name)
- )
- if git_type == SupportedGitTypes.GITLAB and SupportedGitTypes.is_enabled(SupportedGitTypes.GITLAB) is False:
- raise ValueError(
- "Python package `python-gitlab` is not installed. "
- "Please install using `elyra[gitlab]` to use GitLab as DAG repository."
- )
- github_api_endpoint = runtime_configuration.metadata.get("github_api_endpoint")
- github_repo_token = runtime_configuration.metadata.get("github_repo_token")
- github_repo = runtime_configuration.metadata.get("github_repo")
- github_branch = runtime_configuration.metadata.get("github_branch")
- self.log_pipeline_info(pipeline.name, "Submitting pipeline")
- with tempfile.TemporaryDirectory() as temp_dir:
- pipeline_export_path = os.path.join(temp_dir, f"{pipeline.name}.py")
- self.log.debug(f"Creating temp directory '{temp_dir}'")
- pipeline_filepath = self.create_pipeline_file(
- pipeline=pipeline,
- pipeline_export_format="py",
- pipeline_export_path=pipeline_export_path,
- pipeline_name=pipeline.name,
- pipeline_instance_id=pipeline_instance_id,
- )
- self.log.debug(f"Uploading pipeline file '{pipeline_filepath}'")
- try:
- if git_type == SupportedGitTypes.GITHUB:
- git_client = GithubClient(
- server_url=github_api_endpoint, token=github_repo_token, repo=github_repo, branch=github_branch
- )
- else:
- git_client = GitLabClient(
- server_url=github_api_endpoint,
- token=github_repo_token,
- project=github_repo,
- branch=github_branch,
- )
- except BaseException as be:
- raise RuntimeError(f"Unable to create a connection to {github_api_endpoint}: {str(be)}") from be
- git_client.upload_dag(pipeline_filepath, pipeline_instance_id)
- self.log.info("Waiting for Airflow Scheduler to process and start the pipeline")
- download_url = git_client.get_git_url(
- api_url=github_api_endpoint, repository_name=github_repo, repository_branch=github_branch
- )
- self.log_pipeline_info(
- pipeline.name, f"pipeline pushed to git: {download_url}", duration=(time.time() - t0_all)
- )
- if pipeline.contains_generic_operations():
- object_storage_url = f"{cos_endpoint}"
- os_path = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
- object_storage_path = f"/{cos_bucket}/{os_path}"
- else:
- object_storage_url = None
- object_storage_path = None
- return AirflowPipelineProcessorResponse(
- git_url=f"{download_url}",
- run_url=f"{api_endpoint}",
- object_storage_url=object_storage_url,
- object_storage_path=object_storage_path,
- )
- def export(
- self, pipeline: Pipeline, pipeline_export_format: str, pipeline_export_path: str, overwrite: bool
- ) -> str:
- """
- Export pipeline as Airflow DAG
- """
- # Verify that the AirflowPipelineProcessor supports the given export format
- self._verify_export_format(pipeline_export_format)
- timestamp = datetime.now().strftime("%m%d%H%M%S")
- # Create an instance id that will be used to store
- # the pipelines' dependencies, if applicable
- pipeline_instance_id = f"{pipeline.name}-{timestamp}"
- absolute_pipeline_export_path = get_absolute_path(self.root_dir, pipeline_export_path)
- if os.path.exists(absolute_pipeline_export_path) and not overwrite:
- raise ValueError(f"File '{absolute_pipeline_export_path}' already exists.")
- self.log_pipeline_info(pipeline.name, f"exporting pipeline as a .{pipeline_export_format} file")
- new_pipeline_file_path = self.create_pipeline_file(
- pipeline=pipeline,
- pipeline_export_format="py",
- pipeline_export_path=absolute_pipeline_export_path,
- pipeline_name=pipeline.name,
- pipeline_instance_id=pipeline_instance_id,
- )
- return new_pipeline_file_path
- def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance_id: str) -> OrderedDict:
- """
- Compile the pipeline in preparation for DAG generation
- """
- runtime_configuration = self._get_metadata_configuration(
- schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
- )
- image_namespace = self._get_metadata_configuration(
- schemaspace=RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID, name=None
- )
- cos_endpoint = runtime_configuration.metadata.get("cos_endpoint")
- cos_username = runtime_configuration.metadata.get("cos_username")
- cos_password = runtime_configuration.metadata.get("cos_password")
- cos_secret = runtime_configuration.metadata.get("cos_secret")
- cos_bucket = runtime_configuration.metadata.get("cos_bucket")
- pipeline_instance_id = pipeline_instance_id or pipeline_name
- artifact_object_prefix = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
- self.log_pipeline_info(
- pipeline_name,
- f"processing pipeline dependencies for upload to '{cos_endpoint}' "
- f"bucket '{cos_bucket}' folder '{artifact_object_prefix}'",
- )
- # Create dictionary that maps component Id to its ContainerOp instance
- target_ops = []
- t0_all = time.time()
- # Sort operations based on dependency graph (topological order)
- sorted_operations = PipelineProcessor._sort_operations(pipeline.operations)
- # Determine whether access to cloud storage is required and check connectivity
- for operation in sorted_operations:
- if isinstance(operation, GenericOperation):
- self._verify_cos_connectivity(runtime_configuration)
- break
- # All previous operation outputs should be propagated throughout the pipeline.
- # In order to process this recursively, the current operation's inputs should be combined
- # from its parent's inputs (which, themselves are derived from the outputs of their parent)
- # and its parent's outputs.
- PipelineProcessor._propagate_operation_inputs_outputs(pipeline, sorted_operations)
- # Scrub all node labels of invalid characters
- scrubbed_operations = self._scrub_invalid_characters_from_list(sorted_operations)
- # Generate unique names for all operations
- unique_operations = self._create_unique_node_names(scrubbed_operations)
- for operation in unique_operations:
- if isinstance(operation, GenericOperation):
- operation_artifact_archive = self._get_dependency_archive_name(operation)
- self.log.debug(f"Creating pipeline component:\n {operation} archive : {operation_artifact_archive}")
- # Collect env variables
- pipeline_envs = self._collect_envs(
- operation, cos_secret=cos_secret, cos_username=cos_username, cos_password=cos_password
- )
- # Generate unique ELYRA_RUN_NAME value and expose it as an
- # environment variable in the container.
- # Notebook | script nodes are implemented using the kubernetes_pod_operator
- # (https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html)
- # Environment variables that are passed to this operator are
- # pre-processed by Airflow at runtime and placeholder values (expressed as '{{ xyz }}'
- # - see https://airflow.apache.org/docs/apache-airflow/1.10.12/macros-ref#default-variables)
- # replaced.
- if pipeline_envs is None:
- pipeline_envs = {}
- pipeline_envs["ELYRA_RUN_NAME"] = f"{pipeline_name}-{{{{ ts_nodash }}}}"
- image_pull_policy = None
- runtime_image_pull_secret = None
- for image_instance in image_namespace:
- if image_instance.metadata["image_name"] == operation.runtime_image:
- if image_instance.metadata.get("pull_policy"):
- image_pull_policy = image_instance.metadata["pull_policy"]
- if image_instance.metadata.get("pull_secret"):
- runtime_image_pull_secret = image_instance.metadata["pull_secret"]
- break
- bootscript = BootscriptBuilder(
- filename=operation.filename,
- pipeline_name=pipeline_name,
- cos_endpoint=cos_endpoint,
- cos_bucket=cos_bucket,
- cos_directory=artifact_object_prefix,
- cos_dependencies_archive=operation_artifact_archive,
- inputs=operation.inputs,
- outputs=operation.outputs,
- )
- target_op = {
- "notebook": operation.name,
- "id": operation.id,
- "argument_list": bootscript.container_cmd,
- "runtime_image": operation.runtime_image,
- "pipeline_envs": pipeline_envs,
- "parent_operation_ids": operation.parent_operation_ids,
- "image_pull_policy": image_pull_policy,
- "cpu_request": operation.cpu,
- "mem_request": operation.memory,
- "gpu_limit": operation.gpu,
- "operator_source": operation.component_params["filename"],
- "is_generic_operator": True,
- "doc": operation.doc,
- "volume_mounts": operation.component_params.get(MOUNTED_VOLUMES, []),
- "kubernetes_secrets": operation.component_params.get(KUBERNETES_SECRETS, []),
- }
- if runtime_image_pull_secret is not None:
- target_op["runtime_image_pull_secret"] = runtime_image_pull_secret
- target_ops.append(target_op)
- self.log_pipeline_info(
- pipeline_name,
- f"processing operation dependencies for id '{operation.id}'",
- operation_name=operation.name,
- )
- self._upload_dependencies_to_object_store(
- runtime_configuration, pipeline_name, operation, prefix=artifact_object_prefix
- )
- else:
- # Retrieve component from cache
- component = ComponentCache.instance().get_component(self._type, operation.classifier)
- # Convert the user-entered value of certain properties according to their type
- for component_property in component.properties:
- # Skip properties for which no value was given
- if component_property.ref not in operation.component_params.keys():
- continue
- # Get corresponding property's value from parsed pipeline
- property_value_dict = operation.component_params.get(component_property.ref)
- # The type and value of this property can vary depending on what the user chooses
- # in the pipeline editor. So we get the current active parameter (e.g. StringControl)
- # from the activeControl value
- active_property_name = property_value_dict["activeControl"]
- # One we have the value (e.g. StringControl) we use can retrieve the value
- # assigned to it
- property_value = property_value_dict.get(active_property_name, None)
- # If the value is not found, assign it the default value assigned in parser
- if property_value is None:
- property_value = component_property.value
- self.log.debug(f"Active property name : {active_property_name}, value : {property_value}")
- self.log.debug(
- f"Processing component parameter '{component_property.name}' "
- f"of type '{component_property.data_type}'"
- )
- if (
- property_value
- and str(property_value)[0] == "{"
- and str(property_value)[-1] == "}"
- and isinstance(json.loads(json.dumps(property_value)), dict)
- and set(json.loads(json.dumps(property_value)).keys()) == {"value", "option"}
- ):
- parent_node_name = self._get_node_name(
- target_ops, json.loads(json.dumps(property_value))["value"]
- )
- processed_value = "\"{{ ti.xcom_pull(task_ids='" + parent_node_name + "') }}\""
- operation.component_params[component_property.ref] = processed_value
- elif component_property.data_type == "boolean":
- operation.component_params[component_property.ref] = property_value
- elif component_property.data_type == "string":
- # Add surrounding quotation marks to string value for correct rendering
- # in jinja DAG template
- operation.component_params[component_property.ref] = json.dumps(property_value)
- elif component_property.data_type == "dictionary":
- processed_value = self._process_dictionary_value(property_value)
- operation.component_params[component_property.ref] = processed_value
- elif component_property.data_type == "list":
- processed_value = self._process_list_value(property_value)
- operation.component_params[component_property.ref] = processed_value
- # Remove inputs and outputs from params dict until support for data exchange is provided
- operation.component_params_as_dict.pop("inputs")
- operation.component_params_as_dict.pop("outputs")
- # Locate the import statement. If not found raise...
- import_stmts = []
- # Check for import statement on Component object, otherwise get from class_import_map
- import_stmt = component.import_statement or self.class_import_map.get(component.name)
- if import_stmt:
- import_stmts.append(import_stmt)
- else:
- # If we didn't find a mapping to the import statement, let's check if the component
- # name includes a package prefix. If it does, log a warning, but proceed, otherwise
- # raise an exception.
- if len(component.name.split(".")) > 1: # We (presumably) have a package prefix
- self.log.warning(
- f"Operator '{component.name}' of node '{operation.name}' is not configured "
- f"in the list of available Airflow operators but appears to include a "
- f"package prefix and processing will proceed."
- )
- else:
- raise ValueError(
- f"Operator '{component.name}' of node '{operation.name}' is not configured "
- f"in the list of available operators. Please add the fully-qualified "
- f"package name for '{component.name}' to the "
- f"AirflowPipelineProcessor.available_airflow_operators configuration."
- )
- target_op = {
- "notebook": operation.name,
- "id": operation.id,
- "imports": import_stmts,
- "class_name": component.name,
- "parent_operation_ids": operation.parent_operation_ids,
- "component_params": operation.component_params_as_dict,
- "operator_source": component.component_source,
- "is_generic_operator": False,
- "doc": operation.doc,
- }
- target_ops.append(target_op)
- ordered_target_ops = OrderedDict()
- while target_ops:
- for i in range(len(target_ops)):
- target_op = target_ops.pop(0)
- if not target_op["parent_operation_ids"]:
- ordered_target_ops[target_op["id"]] = target_op
- self.log.debug(f"Added root node {ordered_target_ops[target_op['id']]}")
- elif all(deps in ordered_target_ops.keys() for deps in target_op["parent_operation_ids"]):
- ordered_target_ops[target_op["id"]] = target_op
- self.log.debug(f"Added dependent node {ordered_target_ops[target_op['id']]}")
- else:
- target_ops.append(target_op)
- self.log_pipeline_info(pipeline_name, "pipeline dependencies processed", duration=(time.time() - t0_all))
- return ordered_target_ops
- def create_pipeline_file(
- self,
- pipeline: Pipeline,
- pipeline_export_format: str,
- pipeline_export_path: str,
- pipeline_name: str,
- pipeline_instance_id: str,
- ) -> str:
- """
- Convert the pipeline to an Airflow DAG and store it in pipeline_export_path.
- """
- self.log.info(f"Creating pipeline definition as a .{pipeline_export_format} file")
- if pipeline_export_format == "json":
- with open(pipeline_export_path, "w", encoding="utf-8") as file:
- json.dump(pipeline_export_path, file, ensure_ascii=False, indent=4)
- else:
- # Load template from installed elyra package
- loader = PackageLoader("elyra", "templates/airflow")
- template_env = Environment(loader=loader)
- template_env.filters["regex_replace"] = lambda string: self._scrub_invalid_characters(string)
- template = template_env.get_template("airflow_template.jinja2")
- target_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id)
- runtime_configuration = self._get_metadata_configuration(
- schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
- )
- user_namespace = runtime_configuration.metadata.get("user_namespace", "default")
- cos_secret = runtime_configuration.metadata.get("cos_secret")
- pipeline_description = pipeline.description
- if pipeline_description is None:
- pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."
- python_output = template.render(
- operations_list=target_ops,
- pipeline_name=pipeline_instance_id,
- user_namespace=user_namespace,
- cos_secret=cos_secret,
- kube_config_path=None,
- is_paused_upon_creation="False",
- in_cluster="True",
- pipeline_description=pipeline_description,
- )
- # Write to python file and fix formatting
- with open(pipeline_export_path, "w") as fh:
- # Defer the import to postpone logger messages: https://github.com/psf/black/issues/2058
- import black
- autopep_output = autopep8.fix_code(python_output)
- output_to_file = black.format_str(autopep_output, mode=black.FileMode())
- fh.write(output_to_file)
- return pipeline_export_path
- def _create_unique_node_names(self, operation_list: List[Operation]) -> List[Operation]:
- unique_names = {}
- for operation in operation_list:
- # Ensure operation name is unique
- new_name = operation.name
- while new_name in unique_names:
- new_name = f"{operation.name}_{unique_names[operation.name]}"
- unique_names[operation.name] += 1
- operation.name = new_name
- unique_names[operation.name] = 1
- return operation_list
- def _scrub_invalid_characters_from_list(self, operation_list: List[Operation]) -> List[Operation]:
- for operation in operation_list:
- operation.name = self._scrub_invalid_characters(operation.name)
- return operation_list
- def _scrub_invalid_characters(self, name: str) -> str:
- chars = re.escape(string.punctuation)
- clean_name = re.sub(r"[" + chars + "\\s]", "_", name) # noqa E226
- return clean_name
- def _process_dictionary_value(self, value: str) -> Union[Dict, str]:
- """
- For component parameters of type dictionary, if a string value is returned from the superclass
- method, it must be converted to include surrounding quotation marks for correct rendering
- in jinja DAG template.
- """
- converted_value = super()._process_dictionary_value(value)
- if isinstance(converted_value, str):
- converted_value = json.dumps(converted_value)
- return converted_value
- def _process_list_value(self, value: str) -> Union[List, str]:
- """
- For component parameters of type list, if a string value is returned from the superclass
- method, it must be converted to include surrounding quotation marks for correct rendering
- in jinja DAG template.
- """
- converted_value = super()._process_list_value(value)
- if isinstance(converted_value, str):
- converted_value = json.dumps(converted_value)
- return converted_value
- def _get_node_name(self, operations_list: list, node_id: str) -> Optional[str]:
- for operation in operations_list:
- if operation["id"] == node_id:
- return operation["notebook"]
- return None
- class AirflowPipelineProcessorResponse(PipelineProcessorResponse):
- _type = RuntimeProcessorType.APACHE_AIRFLOW
- _name = "airflow"
- def __init__(self, git_url, run_url, object_storage_url, object_storage_path):
- super().__init__(run_url, object_storage_url, object_storage_path)
- self.git_url = git_url
- def to_json(self):
- response = super().to_json()
- response["git_url"] = self.git_url
- return response
|