validation.py 56 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from enum import IntEnum
  17. from glob import glob
  18. import json
  19. import os
  20. import re
  21. from typing import Dict
  22. from typing import List
  23. from typing import Optional
  24. import networkx as nx
  25. from traitlets.config import SingletonConfigurable
  26. from elyra.metadata.manager import MetadataManager
  27. from elyra.metadata.schema import SchemaManager
  28. from elyra.metadata.schemaspaces import Runtimes
  29. from elyra.pipeline.component import Component
  30. from elyra.pipeline.component_catalog import ComponentCache
  31. from elyra.pipeline.pipeline import DataClassJSONEncoder
  32. from elyra.pipeline.pipeline import KeyValueList
  33. from elyra.pipeline.pipeline import KubernetesAnnotation
  34. from elyra.pipeline.pipeline import KubernetesSecret
  35. from elyra.pipeline.pipeline import KubernetesToleration
  36. from elyra.pipeline.pipeline import Operation
  37. from elyra.pipeline.pipeline import PIPELINE_CURRENT_SCHEMA
  38. from elyra.pipeline.pipeline import PIPELINE_CURRENT_VERSION
  39. from elyra.pipeline.pipeline import VolumeMount
  40. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  41. from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
  42. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  43. from elyra.pipeline.pipeline_constants import KUBERNETES_TOLERATIONS
  44. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  45. from elyra.pipeline.pipeline_constants import RUNTIME_IMAGE
  46. from elyra.pipeline.pipeline_definition import Node
  47. from elyra.pipeline.pipeline_definition import PipelineDefinition
  48. from elyra.pipeline.processor import PipelineProcessorManager
  49. from elyra.pipeline.runtime_type import RuntimeProcessorType
  50. from elyra.util.kubernetes import is_valid_annotation_key
  51. from elyra.util.kubernetes import is_valid_kubernetes_key
  52. from elyra.util.kubernetes import is_valid_kubernetes_resource_name
  53. from elyra.util.path import get_expanded_path
  54. class ValidationSeverity(IntEnum):
  55. Error = 1
  56. Warning = 2
  57. Information = 3
  58. Hint = 4
  59. class ValidationResponse(object):
  60. def __init__(self):
  61. self._response = {
  62. "title": "Elyra Pipeline Diagnostics",
  63. "description": "Issues discovered when parsing the pipeline",
  64. "issues": [],
  65. }
  66. self._has_fatal = False
  67. @property
  68. def response(self) -> Dict:
  69. """
  70. :return: The dict of validation errors and warnings found in the pipeline
  71. """
  72. return self._response
  73. @property
  74. def has_fatal(self):
  75. return self._has_fatal
  76. def add_message(
  77. self,
  78. message: str,
  79. message_type: Optional[str] = "",
  80. data: Optional[Dict] = "",
  81. severity: ValidationSeverity = ValidationSeverity.Warning,
  82. ):
  83. """
  84. Helper function to add a diagnostic message to the response to be sent back
  85. :param message: A simple message describing the issue
  86. :param message_type: The type of message to send back e.g. invalidNodeType, invalidPipeline
  87. :param data: a Dict with granular details regarding the error e.g. the nodeID, pipelineID, linkID etc.
  88. :param severity: the severity level of the issue
  89. :return:
  90. """
  91. valid_severity_levels = [
  92. ValidationSeverity.Error,
  93. ValidationSeverity.Warning,
  94. ValidationSeverity.Information,
  95. ValidationSeverity.Hint,
  96. ]
  97. if severity in valid_severity_levels:
  98. diagnostic = {
  99. "severity": severity.value,
  100. "source": "Elyra Pipeline Validation Service",
  101. "type": message_type,
  102. "message": message,
  103. "data": data,
  104. }
  105. self._response["issues"].append(diagnostic)
  106. if severity is ValidationSeverity.Error:
  107. self._has_fatal = True
  108. def to_json(self):
  109. return self._response
  110. class PipelineValidationManager(SingletonConfigurable):
  111. def __init__(self, **kwargs):
  112. root_dir: Optional[str] = kwargs.pop("root_dir", None)
  113. super().__init__(**kwargs)
  114. self.root_dir = get_expanded_path(root_dir)
  115. async def validate(self, pipeline: Dict) -> ValidationResponse:
  116. """
  117. Validates the pipeline JSON payload
  118. :param pipeline: the pipeline definition to be validated
  119. :return: ValidationResponse containing any and all issues discovered during the validation
  120. """
  121. response = ValidationResponse()
  122. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
  123. issues = pipeline_definition.validate()
  124. for issue in issues:
  125. response.add_message(severity=ValidationSeverity.Error, message_type="invalidJSON", message=issue)
  126. try:
  127. primary_pipeline = pipeline_definition.primary_pipeline
  128. except ValueError:
  129. response.add_message(
  130. severity=ValidationSeverity.Error,
  131. message_type="invalidJSON",
  132. message="Invalid JSON detected, unable to continue.",
  133. )
  134. return response
  135. # Validation can be driven from runtime_config since both runtime and pipeline_type can
  136. # be derived from that and we should not use the 'runtime' and 'runtime_type' fields in
  137. # the pipeline.
  138. # Note: validation updates the pipeline definition with the correct values
  139. # of 'runtime' and 'runtime_type' obtained from 'runtime_config'. We may want to move this
  140. # into PipelineDefinition, but then parsing tests have issues because parsing (tests) assume
  141. # no validation has been applied to the pipeline.
  142. runtime_config = primary_pipeline.runtime_config
  143. if runtime_config is None:
  144. runtime_config = "local"
  145. pipeline_runtime = PipelineValidationManager._determine_runtime(runtime_config)
  146. if PipelineProcessorManager.instance().is_supported_runtime(pipeline_runtime):
  147. # Set the runtime since its derived from runtime_config and valid
  148. primary_pipeline.set("runtime", pipeline_runtime)
  149. else:
  150. response.add_message(
  151. severity=ValidationSeverity.Error,
  152. message_type="invalidRuntime",
  153. message="Unsupported pipeline runtime",
  154. data={"pipelineRuntime": pipeline_runtime},
  155. )
  156. self._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
  157. pipeline_type = PipelineValidationManager._determine_runtime_type(runtime_config)
  158. await self._validate_compatibility(
  159. pipeline_definition=pipeline_definition,
  160. pipeline_type=pipeline_type,
  161. pipeline_runtime=pipeline_runtime,
  162. response=response,
  163. )
  164. self._validate_pipeline_graph(pipeline=pipeline, response=response)
  165. if response.has_fatal:
  166. return response
  167. # Set runtime_type since its derived from runtime_config, in case its needed
  168. primary_pipeline.set("runtime_type", pipeline_type)
  169. await self._validate_node_properties(
  170. pipeline_definition=pipeline_definition,
  171. pipeline_type=pipeline_type,
  172. pipeline_runtime=pipeline_runtime,
  173. response=response,
  174. )
  175. return response
  176. @staticmethod
  177. def _determine_runtime(runtime_config: str) -> str:
  178. """Derives the runtime (processor) from the runtime_config."""
  179. # If not present or 'local', treat as special case.
  180. if not runtime_config or runtime_config.upper() == RuntimeProcessorType.LOCAL.name:
  181. return RuntimeProcessorType.LOCAL.name.lower()
  182. runtime_metadata = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID).get(runtime_config)
  183. return runtime_metadata.schema_name
  184. @staticmethod
  185. def _determine_runtime_type(runtime_config: str) -> str:
  186. """Derives the runtime type (platform) from the runtime_config."""
  187. # Pull the runtime_type (platform) from the runtime_config
  188. # Need to special case 'local' runtime_config instances
  189. if runtime_config.lower() == "local":
  190. runtime_type = RuntimeProcessorType.LOCAL
  191. else:
  192. runtime_metadata = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID).get(runtime_config)
  193. runtime_type_name = runtime_metadata.metadata.get("runtime_type")
  194. try:
  195. runtime_type = RuntimeProcessorType.get_instance_by_name(runtime_type_name)
  196. except (KeyError, TypeError):
  197. raise ValueError(
  198. f"Unsupported pipeline runtime: '{runtime_type_name}' " f"found in config '{runtime_config}'!"
  199. )
  200. return runtime_type.name
  201. def _validate_pipeline_structure(
  202. self, pipeline_definition: PipelineDefinition, response: ValidationResponse
  203. ) -> None:
  204. """
  205. Validates the pipeline structure based on version of schema
  206. :param pipeline_definition: the pipeline definition to be validated
  207. :param response: ValidationResponse containing the issue list to be updated
  208. """
  209. # Validate pipeline schema version
  210. if float(pipeline_definition.schema_version) != PIPELINE_CURRENT_SCHEMA:
  211. response.add_message(
  212. severity=ValidationSeverity.Error,
  213. message_type="invalidPipeline",
  214. message="Incompatible pipeline schema version detected.",
  215. data={
  216. "supported_schema_version": PIPELINE_CURRENT_SCHEMA,
  217. "detected_schema_version": float(pipeline_definition.schema_version),
  218. },
  219. )
  220. # validate pipeline version compatibility
  221. try:
  222. pipeline_version = pipeline_definition.primary_pipeline.version
  223. if pipeline_version < PIPELINE_CURRENT_VERSION:
  224. # Pipeline needs to be migrated
  225. response.add_message(
  226. severity=ValidationSeverity.Error,
  227. message_type="invalidPipeline",
  228. message=f"Pipeline version {pipeline_version} is "
  229. "out of date and needs to be migrated "
  230. f"using the Elyra pipeline editor.",
  231. )
  232. elif pipeline_version > PIPELINE_CURRENT_VERSION:
  233. # New version of Elyra is needed
  234. response.add_message(
  235. severity=ValidationSeverity.Error,
  236. message_type="invalidPipeline",
  237. message="Pipeline was last edited in a newer version of Elyra. "
  238. "Update Elyra to use this pipeline.",
  239. data={"supported_version": PIPELINE_CURRENT_VERSION, "detected_version": pipeline_version},
  240. )
  241. except ValueError:
  242. response.add_message(
  243. severity=ValidationSeverity.Error,
  244. message_type="invalidPipeline",
  245. message="Pipeline version is not a numeric value.",
  246. )
  247. @staticmethod
  248. def _is_compatible_pipeline(runtime_name: str, runtime_type: str):
  249. """Returns true if the pipeline's runtime name is compatible to its type."""
  250. if runtime_type.lower() == "generic":
  251. return True # TODO: this won't always be true as some runtime impls won't support generics
  252. # We need to make the "local" runtimes a real runtime someday! Until then, we have this...
  253. if runtime_name.lower() == "local":
  254. runtime_type_from_schema = runtime_name.upper() # use the up-cased value since runtime_types are up-cased
  255. else: # fetch the metadata instance corresponding to runtime_name and compare its runtime_type
  256. runtime_schema = SchemaManager.instance().get_schema(Runtimes.RUNTIMES_SCHEMASPACE_ID, runtime_name)
  257. runtime_type_from_schema = runtime_schema.get("runtime_type")
  258. return runtime_type_from_schema == runtime_type
  259. async def _validate_compatibility(
  260. self,
  261. pipeline_definition: PipelineDefinition,
  262. pipeline_type: str,
  263. pipeline_runtime: str,
  264. response: ValidationResponse,
  265. ) -> None:
  266. """
  267. Checks that the pipeline payload is compatible with this version of elyra (ISSUE #938)
  268. as well as verifying all nodes in the pipeline are supported by the runtime
  269. :param pipeline_definition: the pipeline definition to be validated
  270. :param pipeline_type: type of the pipeline runtime being used e.g. KUBEFLOW_PIPELINES, APACHE_AIRFLOW, generic
  271. :param pipeline_runtime: name of the pipeline runtime for execution e.g. kfp, airflow, local
  272. :param response: ValidationResponse containing the issue list to be updated
  273. """
  274. primary_pipeline_id = pipeline_definition.primary_pipeline.id
  275. supported_ops = []
  276. if pipeline_runtime:
  277. if not PipelineValidationManager._is_compatible_pipeline(pipeline_runtime, pipeline_type):
  278. response.add_message(
  279. severity=ValidationSeverity.Error,
  280. message_type="invalidRuntime",
  281. message="Pipeline runtime platform is not compatible with selected runtime configuration.",
  282. data={
  283. "pipelineID": primary_pipeline_id,
  284. "pipelineType": pipeline_type,
  285. "pipelineRuntime": pipeline_runtime,
  286. },
  287. )
  288. else:
  289. processor_manager = PipelineProcessorManager.instance(root_dir=self.root_dir)
  290. if processor_manager.is_supported_runtime(pipeline_runtime):
  291. component_list = await processor_manager.get_components(pipeline_runtime)
  292. for component in component_list:
  293. supported_ops.append(component.op)
  294. # Checks pipeline node types are compatible with the runtime selected
  295. for sub_pipeline in pipeline_definition.pipelines:
  296. for node in sub_pipeline.nodes:
  297. if (
  298. node.op not in ComponentCache.get_generic_component_ops()
  299. and pipeline_runtime == "local"
  300. ):
  301. response.add_message(
  302. severity=ValidationSeverity.Error,
  303. message_type="invalidNodeType",
  304. message="This pipeline contains at least one runtime-specific "
  305. "component, but pipeline runtime is 'local'. Specify a "
  306. "runtime config or remove runtime-specific components "
  307. "from the pipeline",
  308. data={"nodeID": node.id, "nodeOpName": node.op, "pipelineId": sub_pipeline.id},
  309. )
  310. break
  311. if node.type == "execution_node" and node.op not in supported_ops:
  312. response.add_message(
  313. severity=ValidationSeverity.Error,
  314. message_type="invalidNodeType",
  315. message="This component was not found in the catalog. Please add it "
  316. "to your component catalog or remove this node from the "
  317. "pipeline",
  318. data={
  319. "nodeID": node.id,
  320. "nodeOpName": node.op,
  321. "nodeName": node.label,
  322. "pipelineId": sub_pipeline.id,
  323. },
  324. )
  325. else:
  326. response.add_message(
  327. severity=ValidationSeverity.Error,
  328. message_type="invalidRuntime",
  329. message="Unsupported pipeline runtime",
  330. data={
  331. "pipelineRuntime": pipeline_runtime,
  332. "pipelineType": pipeline_type,
  333. "pipelineId": primary_pipeline_id,
  334. },
  335. )
  336. async def _validate_node_properties(
  337. self,
  338. pipeline_definition: PipelineDefinition,
  339. pipeline_type: str,
  340. pipeline_runtime: str,
  341. response: ValidationResponse,
  342. ) -> None:
  343. """
  344. Validates each of the node's structure for required fields/properties as well as
  345. their values
  346. :param pipeline_definition: the pipeline definition to be validated
  347. :param pipeline_type: name of the pipeline runtime being used e.g. kfp, airflow, generic
  348. :param pipeline_runtime: name of the pipeline runtime for execution e.g. kfp, airflow, local
  349. :param response: ValidationResponse containing the issue list to be updated
  350. """
  351. if pipeline_runtime:
  352. # don't check if incompatible pipeline type and runtime
  353. if not PipelineValidationManager._is_compatible_pipeline(pipeline_runtime, pipeline_type):
  354. return
  355. for pipeline in pipeline_definition.pipelines:
  356. for node in pipeline.nodes:
  357. if node.type == "execution_node":
  358. if Operation.is_generic_operation(node.op):
  359. self._validate_generic_node_properties(
  360. node=node, response=response, pipeline_runtime=pipeline_runtime
  361. )
  362. # Validate runtime components against specific node properties in component registry
  363. else:
  364. await self._validate_custom_component_node_properties(
  365. node=node,
  366. response=response,
  367. pipeline_runtime=pipeline_runtime,
  368. pipeline_definition=pipeline_definition,
  369. )
  370. def _validate_generic_node_properties(self, node: Node, response: ValidationResponse, pipeline_runtime: str):
  371. """
  372. Validate properties of a generic node
  373. :param node: the generic node to check
  374. :param response: the validation response object to attach any error messages
  375. :param pipeline_runtime: the pipeline runtime selected
  376. :return:
  377. """
  378. node_label = node.label
  379. image_name = node.get_component_parameter(RUNTIME_IMAGE)
  380. filename = node.get_component_parameter("filename")
  381. dependencies = node.get_component_parameter("dependencies")
  382. env_vars = node.get_component_parameter(ENV_VARIABLES)
  383. volumes = node.get_component_parameter(MOUNTED_VOLUMES)
  384. secrets = node.get_component_parameter(KUBERNETES_SECRETS)
  385. tolerations = node.get_component_parameter(KUBERNETES_TOLERATIONS)
  386. annotations = node.get_component_parameter(KUBERNETES_POD_ANNOTATIONS)
  387. self._validate_filepath(
  388. node_id=node.id, node_label=node_label, property_name="filename", filename=filename, response=response
  389. )
  390. # If not running locally, we check resource and image name
  391. if pipeline_runtime != "local":
  392. self._validate_container_image_name(node.id, node_label, image_name, response=response)
  393. for resource_name in ["cpu", "gpu", "memory"]:
  394. resource_value = node.get_component_parameter(resource_name)
  395. if resource_value:
  396. self._validate_resource_value(
  397. node.id,
  398. node_label,
  399. resource_name=resource_name,
  400. resource_value=resource_value,
  401. response=response,
  402. )
  403. if volumes:
  404. self._validate_mounted_volumes(node.id, node_label, volumes, response=response)
  405. if secrets:
  406. self._validate_kubernetes_secrets(node.id, node_label, secrets, response=response)
  407. if tolerations:
  408. self._validate_kubernetes_tolerations(node.id, node_label, tolerations, response=response)
  409. if annotations:
  410. self._validate_kubernetes_pod_annotations(node.id, node_label, annotations, response=response)
  411. self._validate_label(node_id=node.id, node_label=node_label, response=response)
  412. if dependencies:
  413. notebook_root_relative_path = os.path.dirname(filename)
  414. for dependency in dependencies:
  415. self._validate_filepath(
  416. node_id=node.id,
  417. node_label=node_label,
  418. file_dir=os.path.join(self.root_dir, notebook_root_relative_path),
  419. property_name="dependencies",
  420. filename=dependency,
  421. response=response,
  422. )
  423. if env_vars:
  424. for env_var in env_vars:
  425. self._validate_environmental_variables(node.id, node_label, env_var=env_var, response=response)
  426. async def _validate_custom_component_node_properties(
  427. self, node: Node, response: ValidationResponse, pipeline_definition: PipelineDefinition, pipeline_runtime: str
  428. ):
  429. """
  430. Validates the properties of the custom component node
  431. :param node: the node to be validated
  432. :param response: the validation response object to attach any error messages
  433. :param pipeline_definition: the pipeline definition containing the node
  434. :param pipeline_runtime: the pipeline runtime selected
  435. :return:
  436. """
  437. component_list = await PipelineProcessorManager.instance().get_components(pipeline_runtime)
  438. components = ComponentCache.to_canvas_palette(component_list)
  439. # Full dict of properties for the operation e.g. current params, optionals etc
  440. component_property_dict = await self._get_component_properties(pipeline_runtime, components, node.op)
  441. # List of just the current parameters for the component
  442. current_parameter_defaults_list = list(
  443. map(lambda x: str(x).replace("elyra_", ""), component_property_dict["current_parameters"].keys())
  444. )
  445. # Remove the non component_parameter jinja templated values we do not check against
  446. current_parameter_defaults_list.remove("component_source")
  447. current_parameter_defaults_list.remove("label")
  448. volumes = node.get_component_parameter(MOUNTED_VOLUMES)
  449. if volumes and MOUNTED_VOLUMES not in node.elyra_properties_to_skip:
  450. self._validate_mounted_volumes(node.id, node.label, volumes, response=response)
  451. tolerations = node.get_component_parameter(KUBERNETES_TOLERATIONS)
  452. if tolerations and KUBERNETES_TOLERATIONS not in node.elyra_properties_to_skip:
  453. self._validate_kubernetes_tolerations(node.id, node.label, tolerations, response=response)
  454. annotations = node.get_component_parameter(KUBERNETES_POD_ANNOTATIONS)
  455. if annotations and KUBERNETES_POD_ANNOTATIONS not in node.elyra_properties_to_skip:
  456. self._validate_kubernetes_pod_annotations(node.id, node.label, annotations, response=response)
  457. for default_parameter in current_parameter_defaults_list:
  458. node_param = node.get_component_parameter(default_parameter)
  459. if self._is_required_property(component_property_dict, default_parameter):
  460. if not node_param:
  461. response.add_message(
  462. severity=ValidationSeverity.Error,
  463. message_type="invalidNodeProperty",
  464. message="Node is missing required property.",
  465. data={"nodeID": node.id, "nodeName": node.label, "propertyName": default_parameter},
  466. )
  467. elif self._get_component_type(component_property_dict, default_parameter) == "inputpath":
  468. # Any component property with type `InputPath` will be a dictionary of two keys
  469. # "value": the node ID of the parent node containing the output
  470. # "option": the name of the key (which is an output) of the above referenced node
  471. if (
  472. not isinstance(node_param, dict)
  473. or len(node_param) != 2
  474. or set(node_param.keys()) != {"value", "option"}
  475. ):
  476. response.add_message(
  477. severity=ValidationSeverity.Error,
  478. message_type="invalidNodeProperty",
  479. message="Node has malformed `InputPath` parameter structure",
  480. data={"nodeID": node.id, "nodeName": node.label},
  481. )
  482. node_ids = list(x.get("node_id_ref", None) for x in node.component_links)
  483. parent_list = self._get_parent_id_list(pipeline_definition, node_ids, [])
  484. node_param_value = node_param.get("value")
  485. if node_param_value not in parent_list:
  486. response.add_message(
  487. severity=ValidationSeverity.Error,
  488. message_type="invalidNodeProperty",
  489. message="Node contains an invalid inputpath reference. Please "
  490. "check your node-to-node connections",
  491. data={"nodeID": node.id, "nodeName": node.label},
  492. )
  493. elif isinstance(node_param, dict) and node_param.get("activeControl") == "NestedEnumControl":
  494. if not node_param.get("NestedEnumControl"):
  495. response.add_message(
  496. severity=ValidationSeverity.Error,
  497. message_type="invalidNodeProperty",
  498. message="Node contains an invalid reference to an node output. Please "
  499. "check the node properties are configured properly",
  500. data={"nodeID": node.id, "nodeName": node.label},
  501. )
  502. else:
  503. # TODO: Update this hardcoded check for xcom_push. This parameter is specific to a runtime
  504. # (Airflow). i.e. abstraction for byo validation?
  505. node_param_value = node_param["NestedEnumControl"].get("value")
  506. upstream_node = pipeline_definition.get_node(node_param_value)
  507. xcom_param = upstream_node.get_component_parameter("xcom_push")
  508. if xcom_param:
  509. xcom_value = xcom_param.get("BooleanControl")
  510. if not xcom_value:
  511. response.add_message(
  512. severity=ValidationSeverity.Error,
  513. message_type="invalidNodeProperty",
  514. message="Node contains an invalid input reference. The parent "
  515. "node does not have the xcom_push property enabled",
  516. data={
  517. "nodeID": node.id,
  518. "nodeName": node.label,
  519. "parentNodeID": upstream_node.label,
  520. },
  521. )
  522. def _validate_container_image_name(
  523. self, node_id: str, node_label: str, image_name: str, response: ValidationResponse
  524. ) -> None:
  525. """
  526. Validates the image name exists and is proper in syntax
  527. :param node_id: the unique ID of the node
  528. :param node_label: the given node name or user customized name/label of the node
  529. :param image_name: container image name to be evaluated
  530. :param response: ValidationResponse containing the issue list to be updated
  531. """
  532. if not image_name:
  533. response.add_message(
  534. severity=ValidationSeverity.Error,
  535. message_type="invalidNodeProperty",
  536. message="Required property value is missing.",
  537. data={"nodeID": node_id, "nodeName": node_label, "propertyName": "runtime_image"},
  538. )
  539. else:
  540. image_regex = re.compile(r"[^/ ]+/[^/ ]+$")
  541. matched = image_regex.search(image_name)
  542. if not matched:
  543. response.add_message(
  544. severity=ValidationSeverity.Error,
  545. message_type="invalidNodeProperty",
  546. message="Node contains an invalid runtime image. Runtime image "
  547. "must conform to the format [registry/]owner/image:tag",
  548. data={
  549. "nodeID": node_id,
  550. "nodeName": node_label,
  551. "propertyName": "runtime_image",
  552. "imageName": image_name,
  553. },
  554. )
  555. def _validate_resource_value(
  556. self, node_id: str, node_label: str, resource_name: str, resource_value: str, response: ValidationResponse
  557. ) -> None:
  558. """
  559. Validates the value for hardware resources requested
  560. :param node_id: the unique ID of the node
  561. :param node_label: the given node name or user customized name/label of the node
  562. :param resource_name: the name of the resource e.g. cpu, gpu. memory
  563. :param resource_value: the value of the resource
  564. :param response: ValidationResponse containing the issue list to be updated
  565. """
  566. try:
  567. if int(resource_value) <= 0:
  568. response.add_message(
  569. severity=ValidationSeverity.Error,
  570. message_type="invalidNodeProperty",
  571. message="Property must be greater than zero.",
  572. data={
  573. "nodeID": node_id,
  574. "nodeName": node_label,
  575. "propertyName": resource_name,
  576. "value": resource_value,
  577. },
  578. )
  579. except (ValueError, TypeError):
  580. response.add_message(
  581. severity=ValidationSeverity.Error,
  582. message_type="invalidNodeProperty",
  583. message="Property has a non-numeric value.",
  584. data={
  585. "nodeID": node_id,
  586. "nodeName": node_label,
  587. "propertyName": resource_name,
  588. "value": resource_value,
  589. },
  590. )
  591. def _validate_mounted_volumes(
  592. self, node_id: str, node_label: str, volumes: List[VolumeMount], response: ValidationResponse
  593. ) -> None:
  594. """
  595. Checks the format of mounted volumes to ensure they're in the correct form
  596. e.g. foo/path=pvc_name
  597. :param node_id: the unique ID of the node
  598. :param node_label: the given node name or user customized name/label of the node
  599. :param volumes: a KeyValueList of volumes to check
  600. :param response: ValidationResponse containing the issue list to be updated
  601. """
  602. for volume in volumes:
  603. # Ensure the PVC name is syntactically a valid Kubernetes resource name
  604. if not is_valid_kubernetes_resource_name(volume.pvc_name):
  605. response.add_message(
  606. severity=ValidationSeverity.Error,
  607. message_type="invalidVolumeMount",
  608. message=f"PVC name '{volume.pvc_name}' is not a valid Kubernetes resource name.",
  609. data={
  610. "nodeID": node_id,
  611. "nodeName": node_label,
  612. "propertyName": MOUNTED_VOLUMES,
  613. "value": KeyValueList.to_str(volume.path, volume.pvc_name),
  614. },
  615. )
  616. def _validate_kubernetes_secrets(
  617. self, node_id: str, node_label: str, secrets: List[KubernetesSecret], response: ValidationResponse
  618. ) -> None:
  619. """
  620. Checks the format of Kubernetes secrets to ensure they're in the correct form
  621. e.g. FOO=SECRET_NAME:KEY
  622. :param node_id: the unique ID of the node
  623. :param node_label: the given node name or user customized name/label of the node
  624. :param secrets: a KeyValueList of secrets to check
  625. :param response: ValidationResponse containing the issue list to be updated
  626. """
  627. for secret in secrets:
  628. if not secret.name or not secret.key:
  629. response.add_message(
  630. severity=ValidationSeverity.Error,
  631. message_type="invalidKubernetesSecret",
  632. message=f"Environment variable '{secret.env_var}' has an improperly formatted representation of "
  633. f"secret name and key.",
  634. data={
  635. "nodeID": node_id,
  636. "nodeName": node_label,
  637. "propertyName": KUBERNETES_SECRETS,
  638. "value": KeyValueList.to_str(secret.env_var, f"{(secret.name or '')}:{(secret.key or '')}"),
  639. },
  640. )
  641. continue
  642. # Ensure the secret name is syntactically a valid Kubernetes resource name
  643. if not is_valid_kubernetes_resource_name(secret.name):
  644. response.add_message(
  645. severity=ValidationSeverity.Error,
  646. message_type="invalidKubernetesSecret",
  647. message=f"Secret name '{secret.name}' is not a valid Kubernetes resource name.",
  648. data={
  649. "nodeID": node_id,
  650. "nodeName": node_label,
  651. "propertyName": KUBERNETES_SECRETS,
  652. "value": KeyValueList.to_str(secret.env_var, f"{secret.name}:{secret.key}"),
  653. },
  654. )
  655. # Ensure the secret key is a syntactically valid Kubernetes key
  656. if not is_valid_kubernetes_key(secret.key):
  657. response.add_message(
  658. severity=ValidationSeverity.Error,
  659. message_type="invalidKubernetesSecret",
  660. message=f"Key '{secret.key}' is not a valid Kubernetes secret key.",
  661. data={
  662. "nodeID": node_id,
  663. "nodeName": node_label,
  664. "propertyName": KUBERNETES_SECRETS,
  665. "value": KeyValueList.to_str(secret.env_var, f"{secret.name}:{secret.key}"),
  666. },
  667. )
  668. def _validate_kubernetes_tolerations(
  669. self, node_id: str, node_label: str, tolerations: List[KubernetesToleration], response: ValidationResponse
  670. ) -> None:
  671. """
  672. Checks the format of kubernetes tolerations to ensure they're in the correct form
  673. e.g. key:operator:value:effect
  674. :param node_id: the unique ID of the node
  675. :param node_label: the given node name or user customized name/label of the node
  676. :param tolerations: a KeyValueList of tolerations to check
  677. :param response: ValidationResponse containing the issue list to be updated
  678. """
  679. for toleration in tolerations:
  680. # Verify key, operator, value, and effect according to the constraints defined in
  681. # https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.23/#toleration-v1-core
  682. if toleration.operator not in ["Exists", "Equal"]:
  683. response.add_message(
  684. severity=ValidationSeverity.Error,
  685. message_type="invalidKubernetesToleration",
  686. message=f"'{toleration.operator}' is not a valid operator. "
  687. "The value must be one of 'Exists' or 'Equal'.",
  688. data={
  689. "nodeID": node_id,
  690. "nodeName": node_label,
  691. "propertyName": KUBERNETES_TOLERATIONS,
  692. "value": f"{toleration.key}:{toleration.operator}:{toleration.value}:{toleration.effect}",
  693. },
  694. )
  695. if len(toleration.key.strip()) == 0 and toleration.operator == "Equal":
  696. response.add_message(
  697. severity=ValidationSeverity.Error,
  698. message_type="invalidKubernetesToleration",
  699. message=f"'{toleration.operator}' is not a valid operator. "
  700. "Operator must be 'Exists' if no key is specified.",
  701. data={
  702. "nodeID": node_id,
  703. "nodeName": node_label,
  704. "propertyName": KUBERNETES_TOLERATIONS,
  705. "value": f"{toleration.key}:{toleration.operator}:{toleration.value}:{toleration.effect}",
  706. },
  707. )
  708. if len(toleration.effect.strip()) > 0 and toleration.effect not in [
  709. "NoExecute",
  710. "NoSchedule",
  711. "PreferNoSchedule",
  712. ]:
  713. response.add_message(
  714. severity=ValidationSeverity.Error,
  715. message_type="invalidKubernetesToleration",
  716. message=f"'{toleration.effect}' is not a valid effect. Effect must be one of "
  717. "'NoExecute', 'NoSchedule', or 'PreferNoSchedule'.",
  718. data={
  719. "nodeID": node_id,
  720. "nodeName": node_label,
  721. "propertyName": KUBERNETES_TOLERATIONS,
  722. "value": f"{toleration.key}:{toleration.operator}:{toleration.value}:{toleration.effect}",
  723. },
  724. )
  725. if toleration.operator == "Exists" and len(toleration.value.strip()) > 0:
  726. response.add_message(
  727. severity=ValidationSeverity.Error,
  728. message_type="invalidKubernetesToleration",
  729. message=f"'{toleration.value}' is not a valid value. It should be empty if operator is 'Exists'.",
  730. data={
  731. "nodeID": node_id,
  732. "nodeName": node_label,
  733. "propertyName": KUBERNETES_TOLERATIONS,
  734. "value": f"{toleration.key}:{toleration.operator}:{toleration.value}:{toleration.effect}",
  735. },
  736. )
  737. def _validate_kubernetes_pod_annotations(
  738. self, node_id: str, node_label: str, annotations: List[KubernetesAnnotation], response: ValidationResponse
  739. ) -> None:
  740. """
  741. Checks the format of the user-provided annotations to ensure they're in the correct form
  742. e.g. annotation_key=annotation_value
  743. :param node_id: the unique ID of the node
  744. :param node_label: the given node name or user customized name/label of the node
  745. :param annotations: a KeyValueList of annotations to check
  746. :param response: ValidationResponse containing the issue list to be updated
  747. """
  748. for annotation in annotations:
  749. # Ensure the annotation key is valid
  750. if not is_valid_annotation_key(annotation.key):
  751. response.add_message(
  752. severity=ValidationSeverity.Error,
  753. message_type="invalidKubernetesAnnotation",
  754. message=f"'{annotation.key}' is not a valid Kubernetes annotation key.",
  755. data={
  756. "nodeID": node_id,
  757. "nodeName": node_label,
  758. "propertyName": KUBERNETES_POD_ANNOTATIONS,
  759. "value": KeyValueList.to_str(annotation.key, annotation.value),
  760. },
  761. )
  762. def _validate_filepath(
  763. self,
  764. node_id: str,
  765. node_label: str,
  766. property_name: str,
  767. filename: str,
  768. response: ValidationResponse,
  769. file_dir: Optional[str] = "",
  770. ) -> None:
  771. """
  772. Checks the file structure, paths and existence of pipeline dependencies.
  773. Note that this does not cross reference with file path references within the notebook or script itself.
  774. :param node_id: the unique ID of the node
  775. :param node_label: the given node name or user customized name/label of the node
  776. :param property_name: name of the node property being validated
  777. :param filename: the name of the file or directory to verify
  778. :param response: ValidationResponse containing the issue list to be updated
  779. :param file_dir: the dir path of the where the pipeline file resides in the elyra workspace
  780. """
  781. file_dir = file_dir or self.root_dir
  782. if filename == os.path.abspath(filename):
  783. normalized_path = os.path.normpath(filename)
  784. elif filename.startswith(file_dir):
  785. normalized_path = os.path.normpath(filename)
  786. else:
  787. normalized_path = os.path.normpath(f"{os.path.join(file_dir, filename)}")
  788. if not os.path.commonpath([normalized_path, self.root_dir]) == self.root_dir:
  789. response.add_message(
  790. severity=ValidationSeverity.Error,
  791. message_type="invalidFilePath",
  792. message="Property has an invalid reference to a file/dir outside the root workspace.",
  793. data={
  794. "nodeID": node_id,
  795. "nodeName": node_label,
  796. "propertyName": property_name,
  797. "value": normalized_path,
  798. },
  799. )
  800. elif "*" in normalized_path:
  801. if len(glob(normalized_path)) == 0:
  802. response.add_message(
  803. severity=ValidationSeverity.Error,
  804. message_type="invalidFilePath",
  805. message="Property(wildcard) has an invalid path to a file/dir" " or the file/dir does not exist.",
  806. data={
  807. "nodeID": node_id,
  808. "nodeName": node_label,
  809. "propertyName": property_name,
  810. "value": normalized_path,
  811. },
  812. )
  813. elif not os.path.exists(normalized_path):
  814. response.add_message(
  815. severity=ValidationSeverity.Error,
  816. message_type="invalidFilePath",
  817. message="Property has an invalid path to a file/dir or the file/dir does not exist.",
  818. data={
  819. "nodeID": node_id,
  820. "nodeName": node_label,
  821. "propertyName": property_name,
  822. "value": normalized_path,
  823. },
  824. )
  825. def _validate_environmental_variables(
  826. self, node_id: str, node_label: str, env_var: str, response: ValidationResponse
  827. ) -> None:
  828. """
  829. Checks the format of the env var to ensure its in the correct form
  830. e.g. FOO = 'BAR'
  831. :param node_id: the unique ID of the node
  832. :param node_label: the given node name or user customized name/label of the node
  833. :param env_var: the env_var key value pair to check
  834. :param response: ValidationResponse containing the issue list to be updated
  835. """
  836. result = [x.strip(" '\"") for x in env_var.split("=", 1)]
  837. if len(result) != 2:
  838. response.add_message(
  839. severity=ValidationSeverity.Error,
  840. message_type="invalidEnvPair",
  841. message="Property has an improperly formatted env variable key value pair.",
  842. data={"nodeID": node_id, "nodeName": node_label, "propertyName": ENV_VARIABLES, "value": env_var},
  843. )
  844. def _validate_label(self, node_id: str, node_label: str, response: ValidationResponse) -> None:
  845. """
  846. KFP specific check for the label name when constructing the node operation using dsl
  847. :param node_id: the unique ID of the node
  848. :param node_label: the given node name or user customized name/label of the node
  849. :param response: ValidationResponse containing the issue list to be updated
  850. """
  851. label_name_max_length = 63
  852. label_regex = re.compile("^[a-z0-9]([-_.a-z0-9]{0,62}[a-z0-9])?")
  853. matched = label_regex.search(node_label)
  854. if len(node_label) > label_name_max_length:
  855. response.add_message(
  856. severity=ValidationSeverity.Warning,
  857. message_type="invalidNodeLabel",
  858. message="Property value exceeds the max length allowed "
  859. "({label_name_max_length}). This value may be truncated "
  860. "by the runtime service.",
  861. data={"nodeID": node_id, "nodeName": node_label, "propertyName": "label", "value": node_label},
  862. )
  863. if not matched or matched.group(0) != node_label:
  864. response.add_message(
  865. severity=ValidationSeverity.Warning,
  866. message_type="invalidNodeLabel",
  867. message="The node label contains characters that may be replaced "
  868. "by the runtime service. Node labels should "
  869. "start with lower case alphanumeric and contain "
  870. "only lower case alphanumeric, underscores, dots, and dashes.",
  871. data={"nodeID": node_id, "nodeName": node_label, "propertyName": "label", "value": node_label},
  872. )
  873. def _validate_pipeline_graph(self, pipeline: dict, response: ValidationResponse) -> None:
  874. """
  875. Validates that the pipeline is an acyclic graph, meaning no circular references
  876. Converts the pipeline definition into a series of tuple node edges(arrows) that represent the connections
  877. from one node to another via a networkX DiGraph.
  878. Example:
  879. NodeC
  880. ^
  881. |
  882. NodeA -> NodeB -> NodeD
  883. ^ |
  884. | | (Invalid Circular Reference)
  885. <---------
  886. The resulting list of edges (arrows) would then be:
  887. [(NodeA, NodeB), (NodeB, NodeC), (NodeB, NodeD), (NodeD, NodeB)]
  888. the list of nodes added would be:
  889. [NodeA, NodeB, NodeC, NodeD]
  890. This function will add an error message for each cycle found and provide a list of LinkID(s)
  891. representing the cycle, in the example above, we would return a single error message with the LinkIDs
  892. for (NodeB, NodeD) and (NodeD, NodeB)
  893. :param response: ValidationResponse containing the issue list to be updated
  894. :param pipeline: A dictionary describing the pipeline
  895. """
  896. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  897. graph = nx.DiGraph()
  898. for single_pipeline in pipeline_json["pipelines"]:
  899. node_list = single_pipeline["nodes"]
  900. for node in node_list:
  901. if node["type"] == "execution_node":
  902. graph.add_node(node["id"])
  903. if node.get("inputs"):
  904. if "links" in node["inputs"][0]:
  905. for link in node["inputs"][0]["links"]:
  906. if "_outPort" in link["port_id_ref"]: # is ref to node, doesnt add links to supernodes
  907. graph.add_edge(link["port_id_ref"].strip("_outPort"), node["id"])
  908. elif link["port_id_ref"] == "outPort": # do not link to bindings
  909. graph.add_edge(link["node_id_ref"], node["id"])
  910. if node["type"] == "super_node":
  911. for link in node["inputs"][0]["links"]:
  912. child_node_id = node["inputs"][0]["id"].strip("_inPort")
  913. graph.add_edge(link["node_id_ref"], child_node_id)
  914. for isolate in nx.isolates(graph):
  915. if graph.number_of_nodes() > 1:
  916. response.add_message(
  917. severity=ValidationSeverity.Warning,
  918. message_type="singletonReference",
  919. message="Node is not connected to any other node.",
  920. data={
  921. "nodeID": isolate,
  922. "nodeName": self._get_node_names(pipeline=pipeline, node_id_list=[isolate])[0],
  923. "pipelineID": self._get_pipeline_id(pipeline, node_id=isolate),
  924. },
  925. )
  926. cycles_detected = nx.simple_cycles(graph)
  927. if len(list(cycles_detected)) > 0:
  928. response.add_message(
  929. severity=ValidationSeverity.Error,
  930. message_type="circularReference",
  931. message="The pipeline contains a circular dependency between nodes.",
  932. data={},
  933. )
  934. def _get_pipeline_id(self, pipeline: dict, node_id: str) -> Optional[str]:
  935. """
  936. Given a node ID, returns the pipeline ID of where the node is currently connected to
  937. :param pipeline: pipeline definition where the link is located
  938. :param node_id: the node ID of the node
  939. :return: the pipeline ID of where the node is located
  940. """
  941. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  942. for single_pipeline in pipeline_json["pipelines"]:
  943. node_list = single_pipeline["nodes"]
  944. for node in node_list:
  945. if node["id"] == node_id:
  946. return single_pipeline["id"]
  947. return None
  948. async def _get_component_properties(self, pipeline_runtime: str, components: dict, node_op: str) -> Dict:
  949. """
  950. Retrieve the full dict of properties associated with the node_op
  951. :param components: list of components associated with the pipeline runtime being used e.g. kfp, airflow
  952. :param node_op: the node operation e.g. execute-notebook-node
  953. :return: a list of property names associated with the node op
  954. """
  955. if node_op == "execute-notebook-node":
  956. node_op = "notebooks"
  957. elif node_op == "execute-r-node":
  958. node_op = "r-script"
  959. elif node_op == "execute-python-node":
  960. node_op = "python-script"
  961. for category in components["categories"]:
  962. for node_type in category["node_types"]:
  963. if node_op == node_type["op"]:
  964. component: Component = await PipelineProcessorManager.instance().get_component(
  965. pipeline_runtime, node_op
  966. )
  967. component_properties = ComponentCache.to_canvas_properties(component)
  968. return component_properties
  969. return {}
  970. def _get_node_names(self, pipeline: dict, node_id_list: list) -> List:
  971. """
  972. Given a node_id_list, will return the node's name for each node_id in the list, respectively
  973. :param pipeline: pipeline definition where the node is located
  974. :param node_id_list: a list of UUIDs defined in the pipeline file
  975. :return: a string representing the name of the node
  976. """
  977. node_name_list = []
  978. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  979. for node_id in node_id_list:
  980. found = False
  981. for single_pipeline in pipeline_json["pipelines"]:
  982. for node in single_pipeline["nodes"]:
  983. if node["id"] == node_id:
  984. node_name_list.append(self._get_node_label(node))
  985. found = True
  986. break
  987. if found:
  988. break
  989. return node_name_list
  990. def _get_node_labels(self, pipeline: dict, link_ids: List[str]) -> Optional[List[str]]:
  991. """
  992. Returns the names (labels) of the nodes that are connected by
  993. the specified link_ids.
  994. :param pipeline: the pipeline dict
  995. :param link_ids: list of link ids from pipeline
  996. :return a tuple containing two node labels that are connected
  997. """
  998. if link_ids is None:
  999. return None
  1000. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  1001. node_labels = []
  1002. for link_id in link_ids:
  1003. for single_pipeline in pipeline_json["pipelines"]:
  1004. for node in single_pipeline["nodes"]:
  1005. if node["type"] == "execution_node":
  1006. for input in node.get("inputs", []):
  1007. for link in input.get("links", []):
  1008. if link["id"] == link_id:
  1009. node_labels.append(self._get_node_label(node))
  1010. return node_labels
  1011. def _get_node_label(self, node: dict) -> Optional[str]:
  1012. """
  1013. Returns the label for the provided node or None if the information
  1014. cannot be derived from the inpuit dictionary.
  1015. :param node: a dict representing a pipeline node
  1016. :return: the label of the node
  1017. """
  1018. if node is None or node.get("app_data") is None:
  1019. return None
  1020. node_label = node["app_data"].get("label")
  1021. if node["type"] == "execution_node" and node["app_data"].get("ui_data"):
  1022. node_label = node["app_data"]["ui_data"].get("label")
  1023. return node_label
  1024. def _is_legacy_pipeline(self, pipeline: dict) -> bool:
  1025. """
  1026. Checks the pipeline to determine if the pipeline is an older legacy schema
  1027. :param pipeline: the pipeline dict
  1028. :return:
  1029. """
  1030. return pipeline["pipelines"][0]["app_data"].get("properties") is None
  1031. def _is_required_property(self, property_dict: dict, node_property: str) -> bool:
  1032. """
  1033. Determine whether or not a component parameter is required to function correctly
  1034. :param property_dict: the dictionary for the component
  1035. :param node_property: the component property to check
  1036. :return:
  1037. """
  1038. node_op_parameter_list = property_dict["uihints"]["parameter_info"]
  1039. for parameter in node_op_parameter_list:
  1040. if parameter["parameter_ref"] == f"elyra_{node_property}":
  1041. return parameter["data"]["required"]
  1042. return False
  1043. def _get_component_type(self, property_dict: dict, node_property: str, control_id: str = "") -> Optional[str]:
  1044. """
  1045. Helper function to determine the type of a node property
  1046. :param property_dict: a dictionary containing the full list of property parameters and descriptions
  1047. :param node_property: the property to look for
  1048. :param control_id: when using OneOfControl, include the control_id to retrieve the correct format
  1049. :return: the data type associated with node_property, defaults to 'string'
  1050. """
  1051. for prop in property_dict["uihints"]["parameter_info"]:
  1052. if prop["parameter_ref"] == f"elyra_{node_property}":
  1053. if control_id:
  1054. return prop["data"]["controls"][control_id].get("format", "string")
  1055. else:
  1056. return prop["data"].get("format", "string")
  1057. return None
  1058. def _get_parent_id_list(
  1059. self, pipeline_definition: PipelineDefinition, node_id_list: list, parent_list: list
  1060. ) -> List:
  1061. """
  1062. Helper function to return a complete list of parent node_ids
  1063. :param pipeline_definition: the complete pipeline definition
  1064. :param node_id_list: list of parent node ids
  1065. :param parent_list: the list to add additional found parent node ids
  1066. :return:
  1067. """
  1068. for node_id in node_id_list:
  1069. node = pipeline_definition.get_node(node_id)
  1070. if node:
  1071. if node.type in ["execution_node", "super_node"]:
  1072. parent_list.append(node_id)
  1073. node_ids = list(x.get("node_id_ref", None) for x in node.component_links)
  1074. for nid in node_ids: # look-ahead to determine if node is a binding node
  1075. if pipeline_definition.get_node(nid).type == "binding":
  1076. node_ids.remove(nid)
  1077. for super_node in pipeline_definition.get_supernodes():
  1078. if super_node.subflow_pipeline_id == nid:
  1079. links = list(x.get("node_id_ref", None) for x in super_node.component_links)
  1080. node_ids.append(links)
  1081. self._get_parent_id_list(pipeline_definition, node_ids, parent_list)
  1082. else: # binding node
  1083. pass
  1084. return parent_list