processor_airflow.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from collections import OrderedDict
  17. from datetime import datetime
  18. import json
  19. import os
  20. import re
  21. import string
  22. import tempfile
  23. import time
  24. from typing import Dict
  25. from typing import List
  26. from typing import Optional
  27. from typing import Union
  28. import autopep8
  29. from jinja2 import Environment
  30. from jinja2 import PackageLoader
  31. from traitlets import CUnicode
  32. from traitlets import List as ListTrait
  33. from elyra._version import __version__
  34. from elyra.airflow.operator import BootscriptBuilder
  35. from elyra.metadata.schemaspaces import RuntimeImages
  36. from elyra.metadata.schemaspaces import Runtimes
  37. from elyra.pipeline.component_catalog import ComponentCache
  38. from elyra.pipeline.pipeline import GenericOperation
  39. from elyra.pipeline.pipeline import Operation
  40. from elyra.pipeline.pipeline import Pipeline
  41. from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
  42. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  43. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  44. from elyra.pipeline.processor import PipelineProcessor
  45. from elyra.pipeline.processor import PipelineProcessorResponse
  46. from elyra.pipeline.processor import RuntimePipelineProcessor
  47. from elyra.pipeline.runtime_type import RuntimeProcessorType
  48. from elyra.util.cos import join_paths
  49. from elyra.util.github import GithubClient
  50. try:
  51. from elyra.util.gitlab import GitLabClient
  52. except ImportError:
  53. pass # Gitlab package is not installed, ignore and use only GitHub
  54. from elyra.util.gitutil import SupportedGitTypes # noqa:I202
  55. from elyra.util.path import get_absolute_path
  56. class AirflowPipelineProcessor(RuntimePipelineProcessor):
  57. _type = RuntimeProcessorType.APACHE_AIRFLOW
  58. _name = "airflow"
  59. # Provide users with the ability to identify a writable directory in the
  60. # running container where the notebook | script is executed. The location
  61. # must exist and be known before the container is started.
  62. # Defaults to `/tmp`
  63. WCD = os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp").strip().rstrip("/")
  64. # This specifies the default airflow operators included with Elyra. Any Airflow-based
  65. # custom connectors should create/extend the elyra configuration file to include
  66. # those fully-qualified operator/class names.
  67. available_airflow_operators = ListTrait(
  68. CUnicode(),
  69. [
  70. "airflow.operators.slack_operator.SlackAPIPostOperator",
  71. "airflow.operators.bash_operator.BashOperator",
  72. "airflow.operators.email_operator.EmailOperator",
  73. "airflow.operators.http_operator.SimpleHttpOperator",
  74. "airflow.contrib.operators.spark_sql_operator.SparkSqlOperator",
  75. "airflow.contrib.operators.spark_submit_operator.SparkSubmitOperator",
  76. ],
  77. help="""List of available Apache Airflow operator names.
  78. Operators available for use within Apache Airflow pipelines. These operators must
  79. be fully qualified (i.e., prefixed with their package names).
  80. """,
  81. ).tag(config=True)
  82. # Contains mappings from class to import statement for each available Airflow operator
  83. class_import_map = {}
  84. def __init__(self, root_dir, **kwargs):
  85. super().__init__(root_dir, **kwargs)
  86. if not self.class_import_map: # Only need to load once
  87. for package in self.available_airflow_operators:
  88. parts = package.rsplit(".", 1)
  89. self.class_import_map[parts[1]] = f"from {parts[0]} import {parts[1]}"
  90. self.log.debug(f"class_package_map = {self.class_import_map}")
  91. def process(self, pipeline: Pipeline) -> None:
  92. """
  93. Submit the pipeline for execution on Apache Airflow.
  94. """
  95. t0_all = time.time()
  96. timestamp = datetime.now().strftime("%m%d%H%M%S")
  97. # Create an instance id that will be used to store
  98. # the pipelines' dependencies, if applicable
  99. pipeline_instance_id = f"{pipeline.name}-{timestamp}"
  100. runtime_configuration = self._get_metadata_configuration(
  101. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  102. )
  103. api_endpoint = runtime_configuration.metadata.get("api_endpoint")
  104. cos_endpoint = runtime_configuration.metadata.get("cos_endpoint")
  105. cos_bucket = runtime_configuration.metadata.get("cos_bucket")
  106. git_type = SupportedGitTypes.get_instance_by_name(
  107. runtime_configuration.metadata.get("git_type", SupportedGitTypes.GITHUB.name)
  108. )
  109. if git_type == SupportedGitTypes.GITLAB and SupportedGitTypes.is_enabled(SupportedGitTypes.GITLAB) is False:
  110. raise ValueError(
  111. "Python package `python-gitlab` is not installed. "
  112. "Please install using `elyra[gitlab]` to use GitLab as DAG repository."
  113. )
  114. github_api_endpoint = runtime_configuration.metadata.get("github_api_endpoint")
  115. github_repo_token = runtime_configuration.metadata.get("github_repo_token")
  116. github_repo = runtime_configuration.metadata.get("github_repo")
  117. github_branch = runtime_configuration.metadata.get("github_branch")
  118. self.log_pipeline_info(pipeline.name, "Submitting pipeline")
  119. with tempfile.TemporaryDirectory() as temp_dir:
  120. pipeline_export_path = os.path.join(temp_dir, f"{pipeline.name}.py")
  121. self.log.debug(f"Creating temp directory '{temp_dir}'")
  122. pipeline_filepath = self.create_pipeline_file(
  123. pipeline=pipeline,
  124. pipeline_export_format="py",
  125. pipeline_export_path=pipeline_export_path,
  126. pipeline_name=pipeline.name,
  127. pipeline_instance_id=pipeline_instance_id,
  128. )
  129. self.log.debug(f"Uploading pipeline file '{pipeline_filepath}'")
  130. try:
  131. if git_type == SupportedGitTypes.GITHUB:
  132. git_client = GithubClient(
  133. server_url=github_api_endpoint, token=github_repo_token, repo=github_repo, branch=github_branch
  134. )
  135. else:
  136. git_client = GitLabClient(
  137. server_url=github_api_endpoint,
  138. token=github_repo_token,
  139. project=github_repo,
  140. branch=github_branch,
  141. )
  142. except BaseException as be:
  143. raise RuntimeError(f"Unable to create a connection to {github_api_endpoint}: {str(be)}") from be
  144. git_client.upload_dag(pipeline_filepath, pipeline_instance_id)
  145. self.log.info("Waiting for Airflow Scheduler to process and start the pipeline")
  146. download_url = git_client.get_git_url(
  147. api_url=github_api_endpoint, repository_name=github_repo, repository_branch=github_branch
  148. )
  149. self.log_pipeline_info(
  150. pipeline.name, f"pipeline pushed to git: {download_url}", duration=(time.time() - t0_all)
  151. )
  152. if pipeline.contains_generic_operations():
  153. object_storage_url = f"{cos_endpoint}"
  154. os_path = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
  155. object_storage_path = f"/{cos_bucket}/{os_path}"
  156. else:
  157. object_storage_url = None
  158. object_storage_path = None
  159. return AirflowPipelineProcessorResponse(
  160. git_url=f"{download_url}",
  161. run_url=f"{api_endpoint}",
  162. object_storage_url=object_storage_url,
  163. object_storage_path=object_storage_path,
  164. )
  165. def export(
  166. self, pipeline: Pipeline, pipeline_export_format: str, pipeline_export_path: str, overwrite: bool
  167. ) -> str:
  168. """
  169. Export pipeline as Airflow DAG
  170. """
  171. # Verify that the AirflowPipelineProcessor supports the given export format
  172. self._verify_export_format(pipeline_export_format)
  173. timestamp = datetime.now().strftime("%m%d%H%M%S")
  174. # Create an instance id that will be used to store
  175. # the pipelines' dependencies, if applicable
  176. pipeline_instance_id = f"{pipeline.name}-{timestamp}"
  177. absolute_pipeline_export_path = get_absolute_path(self.root_dir, pipeline_export_path)
  178. if os.path.exists(absolute_pipeline_export_path) and not overwrite:
  179. raise ValueError(f"File '{absolute_pipeline_export_path}' already exists.")
  180. self.log_pipeline_info(pipeline.name, f"exporting pipeline as a .{pipeline_export_format} file")
  181. new_pipeline_file_path = self.create_pipeline_file(
  182. pipeline=pipeline,
  183. pipeline_export_format="py",
  184. pipeline_export_path=absolute_pipeline_export_path,
  185. pipeline_name=pipeline.name,
  186. pipeline_instance_id=pipeline_instance_id,
  187. )
  188. return new_pipeline_file_path
  189. def _cc_pipeline(self, pipeline: Pipeline, pipeline_name: str, pipeline_instance_id: str) -> OrderedDict:
  190. """
  191. Compile the pipeline in preparation for DAG generation
  192. """
  193. runtime_configuration = self._get_metadata_configuration(
  194. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  195. )
  196. image_namespace = self._get_metadata_configuration(
  197. schemaspace=RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID, name=None
  198. )
  199. cos_endpoint = runtime_configuration.metadata.get("cos_endpoint")
  200. cos_username = runtime_configuration.metadata.get("cos_username")
  201. cos_password = runtime_configuration.metadata.get("cos_password")
  202. cos_secret = runtime_configuration.metadata.get("cos_secret")
  203. cos_bucket = runtime_configuration.metadata.get("cos_bucket")
  204. pipeline_instance_id = pipeline_instance_id or pipeline_name
  205. artifact_object_prefix = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
  206. self.log_pipeline_info(
  207. pipeline_name,
  208. f"processing pipeline dependencies for upload to '{cos_endpoint}' "
  209. f"bucket '{cos_bucket}' folder '{artifact_object_prefix}'",
  210. )
  211. # Create dictionary that maps component Id to its ContainerOp instance
  212. target_ops = []
  213. t0_all = time.time()
  214. # Sort operations based on dependency graph (topological order)
  215. sorted_operations = PipelineProcessor._sort_operations(pipeline.operations)
  216. # Determine whether access to cloud storage is required and check connectivity
  217. for operation in sorted_operations:
  218. if isinstance(operation, GenericOperation):
  219. self._verify_cos_connectivity(runtime_configuration)
  220. break
  221. # All previous operation outputs should be propagated throughout the pipeline.
  222. # In order to process this recursively, the current operation's inputs should be combined
  223. # from its parent's inputs (which, themselves are derived from the outputs of their parent)
  224. # and its parent's outputs.
  225. PipelineProcessor._propagate_operation_inputs_outputs(pipeline, sorted_operations)
  226. # Scrub all node labels of invalid characters
  227. scrubbed_operations = self._scrub_invalid_characters_from_list(sorted_operations)
  228. # Generate unique names for all operations
  229. unique_operations = self._create_unique_node_names(scrubbed_operations)
  230. for operation in unique_operations:
  231. if isinstance(operation, GenericOperation):
  232. operation_artifact_archive = self._get_dependency_archive_name(operation)
  233. self.log.debug(f"Creating pipeline component:\n {operation} archive : {operation_artifact_archive}")
  234. # Collect env variables
  235. pipeline_envs = self._collect_envs(
  236. operation, cos_secret=cos_secret, cos_username=cos_username, cos_password=cos_password
  237. )
  238. # Generate unique ELYRA_RUN_NAME value and expose it as an
  239. # environment variable in the container.
  240. # Notebook | script nodes are implemented using the kubernetes_pod_operator
  241. # (https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html)
  242. # Environment variables that are passed to this operator are
  243. # pre-processed by Airflow at runtime and placeholder values (expressed as '{{ xyz }}'
  244. # - see https://airflow.apache.org/docs/apache-airflow/1.10.12/macros-ref#default-variables)
  245. # replaced.
  246. if pipeline_envs is None:
  247. pipeline_envs = {}
  248. pipeline_envs["ELYRA_RUN_NAME"] = f"{pipeline_name}-{{{{ ts_nodash }}}}"
  249. image_pull_policy = None
  250. runtime_image_pull_secret = None
  251. for image_instance in image_namespace:
  252. if image_instance.metadata["image_name"] == operation.runtime_image:
  253. if image_instance.metadata.get("pull_policy"):
  254. image_pull_policy = image_instance.metadata["pull_policy"]
  255. if image_instance.metadata.get("pull_secret"):
  256. runtime_image_pull_secret = image_instance.metadata["pull_secret"]
  257. break
  258. bootscript = BootscriptBuilder(
  259. filename=operation.filename,
  260. pipeline_name=pipeline_name,
  261. cos_endpoint=cos_endpoint,
  262. cos_bucket=cos_bucket,
  263. cos_directory=artifact_object_prefix,
  264. cos_dependencies_archive=operation_artifact_archive,
  265. inputs=operation.inputs,
  266. outputs=operation.outputs,
  267. )
  268. target_op = {
  269. "notebook": operation.name,
  270. "id": operation.id,
  271. "argument_list": bootscript.container_cmd,
  272. "runtime_image": operation.runtime_image,
  273. "pipeline_envs": pipeline_envs,
  274. "parent_operation_ids": operation.parent_operation_ids,
  275. "image_pull_policy": image_pull_policy,
  276. "cpu_request": operation.cpu,
  277. "mem_request": operation.memory,
  278. "gpu_limit": operation.gpu,
  279. "operator_source": operation.component_params["filename"],
  280. "is_generic_operator": True,
  281. "doc": operation.doc,
  282. "volume_mounts": operation.component_params.get(MOUNTED_VOLUMES, []),
  283. "kubernetes_secrets": operation.component_params.get(KUBERNETES_SECRETS, []),
  284. }
  285. if runtime_image_pull_secret is not None:
  286. target_op["runtime_image_pull_secret"] = runtime_image_pull_secret
  287. target_ops.append(target_op)
  288. self.log_pipeline_info(
  289. pipeline_name,
  290. f"processing operation dependencies for id '{operation.id}'",
  291. operation_name=operation.name,
  292. )
  293. self._upload_dependencies_to_object_store(
  294. runtime_configuration, pipeline_name, operation, prefix=artifact_object_prefix
  295. )
  296. else:
  297. # Retrieve component from cache
  298. component = ComponentCache.instance().get_component(self._type, operation.classifier)
  299. # Convert the user-entered value of certain properties according to their type
  300. for component_property in component.properties:
  301. # Skip properties for which no value was given
  302. if component_property.ref not in operation.component_params.keys():
  303. continue
  304. # Get corresponding property's value from parsed pipeline
  305. property_value_dict = operation.component_params.get(component_property.ref)
  306. # The type and value of this property can vary depending on what the user chooses
  307. # in the pipeline editor. So we get the current active parameter (e.g. StringControl)
  308. # from the activeControl value
  309. active_property_name = property_value_dict["activeControl"]
  310. # One we have the value (e.g. StringControl) we use can retrieve the value
  311. # assigned to it
  312. property_value = property_value_dict.get(active_property_name, None)
  313. # If the value is not found, assign it the default value assigned in parser
  314. if property_value is None:
  315. property_value = component_property.value
  316. self.log.debug(f"Active property name : {active_property_name}, value : {property_value}")
  317. self.log.debug(
  318. f"Processing component parameter '{component_property.name}' "
  319. f"of type '{component_property.data_type}'"
  320. )
  321. if (
  322. property_value
  323. and str(property_value)[0] == "{"
  324. and str(property_value)[-1] == "}"
  325. and isinstance(json.loads(json.dumps(property_value)), dict)
  326. and set(json.loads(json.dumps(property_value)).keys()) == {"value", "option"}
  327. ):
  328. parent_node_name = self._get_node_name(
  329. target_ops, json.loads(json.dumps(property_value))["value"]
  330. )
  331. processed_value = "\"{{ ti.xcom_pull(task_ids='" + parent_node_name + "') }}\""
  332. operation.component_params[component_property.ref] = processed_value
  333. elif component_property.data_type == "boolean":
  334. operation.component_params[component_property.ref] = property_value
  335. elif component_property.data_type == "string":
  336. # Add surrounding quotation marks to string value for correct rendering
  337. # in jinja DAG template
  338. operation.component_params[component_property.ref] = json.dumps(property_value)
  339. elif component_property.data_type == "dictionary":
  340. processed_value = self._process_dictionary_value(property_value)
  341. operation.component_params[component_property.ref] = processed_value
  342. elif component_property.data_type == "list":
  343. processed_value = self._process_list_value(property_value)
  344. operation.component_params[component_property.ref] = processed_value
  345. # Remove inputs and outputs from params dict until support for data exchange is provided
  346. operation.component_params_as_dict.pop("inputs")
  347. operation.component_params_as_dict.pop("outputs")
  348. # Locate the import statement. If not found raise...
  349. import_stmts = []
  350. # Check for import statement on Component object, otherwise get from class_import_map
  351. import_stmt = component.import_statement or self.class_import_map.get(component.name)
  352. if import_stmt:
  353. import_stmts.append(import_stmt)
  354. else:
  355. # If we didn't find a mapping to the import statement, let's check if the component
  356. # name includes a package prefix. If it does, log a warning, but proceed, otherwise
  357. # raise an exception.
  358. if len(component.name.split(".")) > 1: # We (presumably) have a package prefix
  359. self.log.warning(
  360. f"Operator '{component.name}' of node '{operation.name}' is not configured "
  361. f"in the list of available Airflow operators but appears to include a "
  362. f"package prefix and processing will proceed."
  363. )
  364. else:
  365. raise ValueError(
  366. f"Operator '{component.name}' of node '{operation.name}' is not configured "
  367. f"in the list of available operators. Please add the fully-qualified "
  368. f"package name for '{component.name}' to the "
  369. f"AirflowPipelineProcessor.available_airflow_operators configuration."
  370. )
  371. target_op = {
  372. "notebook": operation.name,
  373. "id": operation.id,
  374. "imports": import_stmts,
  375. "class_name": component.name,
  376. "parent_operation_ids": operation.parent_operation_ids,
  377. "component_params": operation.component_params_as_dict,
  378. "operator_source": component.component_source,
  379. "is_generic_operator": False,
  380. "doc": operation.doc,
  381. }
  382. target_ops.append(target_op)
  383. ordered_target_ops = OrderedDict()
  384. while target_ops:
  385. for i in range(len(target_ops)):
  386. target_op = target_ops.pop(0)
  387. if not target_op["parent_operation_ids"]:
  388. ordered_target_ops[target_op["id"]] = target_op
  389. self.log.debug(f"Added root node {ordered_target_ops[target_op['id']]}")
  390. elif all(deps in ordered_target_ops.keys() for deps in target_op["parent_operation_ids"]):
  391. ordered_target_ops[target_op["id"]] = target_op
  392. self.log.debug(f"Added dependent node {ordered_target_ops[target_op['id']]}")
  393. else:
  394. target_ops.append(target_op)
  395. self.log_pipeline_info(pipeline_name, "pipeline dependencies processed", duration=(time.time() - t0_all))
  396. return ordered_target_ops
  397. def create_pipeline_file(
  398. self,
  399. pipeline: Pipeline,
  400. pipeline_export_format: str,
  401. pipeline_export_path: str,
  402. pipeline_name: str,
  403. pipeline_instance_id: str,
  404. ) -> str:
  405. """
  406. Convert the pipeline to an Airflow DAG and store it in pipeline_export_path.
  407. """
  408. self.log.info(f"Creating pipeline definition as a .{pipeline_export_format} file")
  409. if pipeline_export_format == "json":
  410. with open(pipeline_export_path, "w", encoding="utf-8") as file:
  411. json.dump(pipeline_export_path, file, ensure_ascii=False, indent=4)
  412. else:
  413. # Load template from installed elyra package
  414. loader = PackageLoader("elyra", "templates/airflow")
  415. template_env = Environment(loader=loader)
  416. template_env.filters["regex_replace"] = lambda string: self._scrub_invalid_characters(string)
  417. template = template_env.get_template("airflow_template.jinja2")
  418. target_ops = self._cc_pipeline(pipeline, pipeline_name, pipeline_instance_id)
  419. runtime_configuration = self._get_metadata_configuration(
  420. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  421. )
  422. user_namespace = runtime_configuration.metadata.get("user_namespace", "default")
  423. cos_secret = runtime_configuration.metadata.get("cos_secret")
  424. pipeline_description = pipeline.description
  425. if pipeline_description is None:
  426. pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."
  427. python_output = template.render(
  428. operations_list=target_ops,
  429. pipeline_name=pipeline_instance_id,
  430. user_namespace=user_namespace,
  431. cos_secret=cos_secret,
  432. kube_config_path=None,
  433. is_paused_upon_creation="False",
  434. in_cluster="True",
  435. pipeline_description=pipeline_description,
  436. )
  437. # Write to python file and fix formatting
  438. with open(pipeline_export_path, "w") as fh:
  439. # Defer the import to postpone logger messages: https://github.com/psf/black/issues/2058
  440. import black
  441. autopep_output = autopep8.fix_code(python_output)
  442. output_to_file = black.format_str(autopep_output, mode=black.FileMode())
  443. fh.write(output_to_file)
  444. return pipeline_export_path
  445. def _create_unique_node_names(self, operation_list: List[Operation]) -> List[Operation]:
  446. unique_names = {}
  447. for operation in operation_list:
  448. # Ensure operation name is unique
  449. new_name = operation.name
  450. while new_name in unique_names:
  451. new_name = f"{operation.name}_{unique_names[operation.name]}"
  452. unique_names[operation.name] += 1
  453. operation.name = new_name
  454. unique_names[operation.name] = 1
  455. return operation_list
  456. def _scrub_invalid_characters_from_list(self, operation_list: List[Operation]) -> List[Operation]:
  457. for operation in operation_list:
  458. operation.name = self._scrub_invalid_characters(operation.name)
  459. return operation_list
  460. def _scrub_invalid_characters(self, name: str) -> str:
  461. chars = re.escape(string.punctuation)
  462. clean_name = re.sub(r"[" + chars + "\\s]", "_", name) # noqa E226
  463. return clean_name
  464. def _process_dictionary_value(self, value: str) -> Union[Dict, str]:
  465. """
  466. For component parameters of type dictionary, if a string value is returned from the superclass
  467. method, it must be converted to include surrounding quotation marks for correct rendering
  468. in jinja DAG template.
  469. """
  470. converted_value = super()._process_dictionary_value(value)
  471. if isinstance(converted_value, str):
  472. converted_value = json.dumps(converted_value)
  473. return converted_value
  474. def _process_list_value(self, value: str) -> Union[List, str]:
  475. """
  476. For component parameters of type list, if a string value is returned from the superclass
  477. method, it must be converted to include surrounding quotation marks for correct rendering
  478. in jinja DAG template.
  479. """
  480. converted_value = super()._process_list_value(value)
  481. if isinstance(converted_value, str):
  482. converted_value = json.dumps(converted_value)
  483. return converted_value
  484. def _get_node_name(self, operations_list: list, node_id: str) -> Optional[str]:
  485. for operation in operations_list:
  486. if operation["id"] == node_id:
  487. return operation["notebook"]
  488. return None
  489. class AirflowPipelineProcessorResponse(PipelineProcessorResponse):
  490. _type = RuntimeProcessorType.APACHE_AIRFLOW
  491. _name = "airflow"
  492. def __init__(self, git_url, run_url, object_storage_url, object_storage_path):
  493. super().__init__(run_url, object_storage_url, object_storage_path)
  494. self.git_url = git_url
  495. def to_json(self):
  496. response = super().to_json()
  497. response["git_url"] = self.git_url
  498. return response