validation.py 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from enum import IntEnum
  17. from glob import glob
  18. import json
  19. import os
  20. import re
  21. from typing import Dict
  22. from typing import List
  23. from typing import Optional
  24. import networkx as nx
  25. from traitlets.config import SingletonConfigurable
  26. from elyra.metadata.manager import MetadataManager
  27. from elyra.metadata.schema import SchemaManager
  28. from elyra.metadata.schemaspaces import Runtimes
  29. from elyra.pipeline.component import Component
  30. from elyra.pipeline.component_catalog import ComponentCache
  31. from elyra.pipeline.pipeline import DataClassJSONEncoder
  32. from elyra.pipeline.pipeline import KeyValueList
  33. from elyra.pipeline.pipeline import KubernetesSecret
  34. from elyra.pipeline.pipeline import Operation
  35. from elyra.pipeline.pipeline import PIPELINE_CURRENT_SCHEMA
  36. from elyra.pipeline.pipeline import PIPELINE_CURRENT_VERSION
  37. from elyra.pipeline.pipeline import VolumeMount
  38. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  39. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  40. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  41. from elyra.pipeline.pipeline_constants import RUNTIME_IMAGE
  42. from elyra.pipeline.pipeline_definition import Node
  43. from elyra.pipeline.pipeline_definition import PipelineDefinition
  44. from elyra.pipeline.processor import PipelineProcessorManager
  45. from elyra.pipeline.runtime_type import RuntimeProcessorType
  46. from elyra.util.kubernetes import is_valid_kubernetes_key
  47. from elyra.util.kubernetes import is_valid_kubernetes_resource_name
  48. from elyra.util.path import get_expanded_path
  49. class ValidationSeverity(IntEnum):
  50. Error = 1
  51. Warning = 2
  52. Information = 3
  53. Hint = 4
  54. class ValidationResponse(object):
  55. def __init__(self):
  56. self._response = {
  57. "title": "Elyra Pipeline Diagnostics",
  58. "description": "Issues discovered when parsing the pipeline",
  59. "issues": [],
  60. }
  61. self._has_fatal = False
  62. @property
  63. def response(self) -> Dict:
  64. """
  65. :return: The dict of validation errors and warnings found in the pipeline
  66. """
  67. return self._response
  68. @property
  69. def has_fatal(self):
  70. return self._has_fatal
  71. def add_message(
  72. self,
  73. message: str,
  74. message_type: Optional[str] = "",
  75. data: Optional[Dict] = "",
  76. severity: ValidationSeverity = ValidationSeverity.Warning,
  77. ):
  78. """
  79. Helper function to add a diagnostic message to the response to be sent back
  80. :param message: A simple message describing the issue
  81. :param message_type: The type of message to send back e.g. invalidNodeType, invalidPipeline
  82. :param data: a Dict with granular details regarding the error e.g. the nodeID, pipelineID, linkID etc.
  83. :param severity: the severity level of the issue
  84. :return:
  85. """
  86. valid_severity_levels = [
  87. ValidationSeverity.Error,
  88. ValidationSeverity.Warning,
  89. ValidationSeverity.Information,
  90. ValidationSeverity.Hint,
  91. ]
  92. if severity in valid_severity_levels:
  93. diagnostic = {
  94. "severity": severity.value,
  95. "source": "Elyra Pipeline Validation Service",
  96. "type": message_type,
  97. "message": message,
  98. "data": data,
  99. }
  100. self._response["issues"].append(diagnostic)
  101. if severity is ValidationSeverity.Error:
  102. self._has_fatal = True
  103. def to_json(self):
  104. return self._response
  105. class PipelineValidationManager(SingletonConfigurable):
  106. def __init__(self, **kwargs):
  107. super().__init__(**kwargs)
  108. self.root_dir = get_expanded_path(kwargs.get("root_dir"))
  109. async def validate(self, pipeline: Dict) -> ValidationResponse:
  110. """
  111. Validates the pipeline JSON payload
  112. :param pipeline: the pipeline definition to be validated
  113. :return: ValidationResponse containing any and all issues discovered during the validation
  114. """
  115. response = ValidationResponse()
  116. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
  117. issues = pipeline_definition.validate()
  118. for issue in issues:
  119. response.add_message(severity=ValidationSeverity.Error, message_type="invalidJSON", message=issue)
  120. try:
  121. primary_pipeline = pipeline_definition.primary_pipeline
  122. except ValueError:
  123. response.add_message(
  124. severity=ValidationSeverity.Error,
  125. message_type="invalidJSON",
  126. message="Invalid JSON detected, unable to continue.",
  127. )
  128. return response
  129. # Validation can be driven from runtime_config since both runtime and pipeline_type can
  130. # be derived from that and we should not use the 'runtime' and 'runtime_type' fields in
  131. # the pipeline.
  132. # Note: validation updates the pipeline definition with the correct values
  133. # of 'runtime' and 'runtime_type' obtained from 'runtime_config'. We may want to move this
  134. # into PipelineDefinition, but then parsing tests have issues because parsing (tests) assume
  135. # no validation has been applied to the pipeline.
  136. runtime_config = primary_pipeline.runtime_config
  137. if runtime_config is None:
  138. runtime_config = "local"
  139. pipeline_runtime = PipelineValidationManager._determine_runtime(runtime_config)
  140. if PipelineProcessorManager.instance().is_supported_runtime(pipeline_runtime):
  141. # Set the runtime since its derived from runtime_config and valid
  142. primary_pipeline.set("runtime", pipeline_runtime)
  143. else:
  144. response.add_message(
  145. severity=ValidationSeverity.Error,
  146. message_type="invalidRuntime",
  147. message="Unsupported pipeline runtime",
  148. data={"pipelineRuntime": pipeline_runtime},
  149. )
  150. self._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
  151. pipeline_type = PipelineValidationManager._determine_runtime_type(runtime_config)
  152. await self._validate_compatibility(
  153. pipeline_definition=pipeline_definition,
  154. pipeline_type=pipeline_type,
  155. pipeline_runtime=pipeline_runtime,
  156. response=response,
  157. )
  158. self._validate_pipeline_graph(pipeline=pipeline, response=response)
  159. if response.has_fatal:
  160. return response
  161. # Set runtime_type since its derived from runtime_config, in case its needed
  162. primary_pipeline.set("runtime_type", pipeline_type)
  163. await self._validate_node_properties(
  164. pipeline_definition=pipeline_definition,
  165. pipeline_type=pipeline_type,
  166. pipeline_runtime=pipeline_runtime,
  167. response=response,
  168. )
  169. return response
  170. @staticmethod
  171. def _determine_runtime(runtime_config: str) -> str:
  172. """Derives the runtime (processor) from the runtime_config."""
  173. # If not present or 'local', treat as special case.
  174. if not runtime_config or runtime_config.upper() == RuntimeProcessorType.LOCAL.name:
  175. return RuntimeProcessorType.LOCAL.name.lower()
  176. runtime_metadata = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID).get(runtime_config)
  177. return runtime_metadata.schema_name
  178. @staticmethod
  179. def _determine_runtime_type(runtime_config: str) -> str:
  180. """Derives the runtime type (platform) from the runtime_config."""
  181. # Pull the runtime_type (platform) from the runtime_config
  182. # Need to special case 'local' runtime_config instances
  183. if runtime_config.lower() == "local":
  184. runtime_type = RuntimeProcessorType.LOCAL
  185. else:
  186. runtime_metadata = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID).get(runtime_config)
  187. runtime_type_name = runtime_metadata.metadata.get("runtime_type")
  188. try:
  189. runtime_type = RuntimeProcessorType.get_instance_by_name(runtime_type_name)
  190. except (KeyError, TypeError):
  191. raise ValueError(
  192. f"Unsupported pipeline runtime: '{runtime_type_name}' " f"found in config '{runtime_config}'!"
  193. )
  194. return runtime_type.name
  195. def _validate_pipeline_structure(
  196. self, pipeline_definition: PipelineDefinition, response: ValidationResponse
  197. ) -> None:
  198. """
  199. Validates the pipeline structure based on version of schema
  200. :param pipeline_definition: the pipeline definition to be validated
  201. :param response: ValidationResponse containing the issue list to be updated
  202. """
  203. # Validate pipeline schema version
  204. if float(pipeline_definition.schema_version) != PIPELINE_CURRENT_SCHEMA:
  205. response.add_message(
  206. severity=ValidationSeverity.Error,
  207. message_type="invalidPipeline",
  208. message="Incompatible pipeline schema version detected.",
  209. data={
  210. "supported_schema_version": PIPELINE_CURRENT_SCHEMA,
  211. "detected_schema_version": float(pipeline_definition.schema_version),
  212. },
  213. )
  214. # validate pipeline version compatibility
  215. try:
  216. pipeline_version = pipeline_definition.primary_pipeline.version
  217. if pipeline_version < PIPELINE_CURRENT_VERSION:
  218. # Pipeline needs to be migrated
  219. response.add_message(
  220. severity=ValidationSeverity.Error,
  221. message_type="invalidPipeline",
  222. message=f"Pipeline version {pipeline_version} is "
  223. "out of date and needs to be migrated "
  224. f"using the Elyra pipeline editor.",
  225. )
  226. elif pipeline_version > PIPELINE_CURRENT_VERSION:
  227. # New version of Elyra is needed
  228. response.add_message(
  229. severity=ValidationSeverity.Error,
  230. message_type="invalidPipeline",
  231. message="Pipeline was last edited in a newer version of Elyra. "
  232. "Update Elyra to use this pipeline.",
  233. data={"supported_version": PIPELINE_CURRENT_VERSION, "detected_version": pipeline_version},
  234. )
  235. except ValueError:
  236. response.add_message(
  237. severity=ValidationSeverity.Error,
  238. message_type="invalidPipeline",
  239. message="Pipeline version is not a numeric value.",
  240. )
  241. @staticmethod
  242. def _is_compatible_pipeline(runtime_name: str, runtime_type: str):
  243. """Returns true if the pipeline's runtime name is compatible to its type."""
  244. if runtime_type.lower() == "generic":
  245. return True # TODO: this won't always be true as some runtime impls won't support generics
  246. # We need to make the "local" runtimes a real runtime someday! Until then, we have this...
  247. if runtime_name.lower() == "local":
  248. runtime_type_from_schema = runtime_name.upper() # use the up-cased value since runtime_types are up-cased
  249. else: # fetch the metadata instance corresponding to runtime_name and compare its runtime_type
  250. runtime_schema = SchemaManager.instance().get_schema(Runtimes.RUNTIMES_SCHEMASPACE_ID, runtime_name)
  251. runtime_type_from_schema = runtime_schema.get("runtime_type")
  252. return runtime_type_from_schema == runtime_type
  253. async def _validate_compatibility(
  254. self,
  255. pipeline_definition: PipelineDefinition,
  256. pipeline_type: str,
  257. pipeline_runtime: str,
  258. response: ValidationResponse,
  259. ) -> None:
  260. """
  261. Checks that the pipeline payload is compatible with this version of elyra (ISSUE #938)
  262. as well as verifying all nodes in the pipeline are supported by the runtime
  263. :param pipeline_definition: the pipeline definition to be validated
  264. :param pipeline_type: type of the pipeline runtime being used e.g. KUBEFLOW_PIPELINES, APACHE_AIRFLOW, generic
  265. :param pipeline_runtime: name of the pipeline runtime for execution e.g. kfp, airflow, local
  266. :param response: ValidationResponse containing the issue list to be updated
  267. """
  268. primary_pipeline_id = pipeline_definition.primary_pipeline.id
  269. supported_ops = []
  270. if pipeline_runtime:
  271. if not PipelineValidationManager._is_compatible_pipeline(pipeline_runtime, pipeline_type):
  272. response.add_message(
  273. severity=ValidationSeverity.Error,
  274. message_type="invalidRuntime",
  275. message="Pipeline runtime platform is not compatible " "with selected runtime configuration.",
  276. data={
  277. "pipelineID": primary_pipeline_id,
  278. "pipelineType": pipeline_type,
  279. "pipelineRuntime": pipeline_runtime,
  280. },
  281. )
  282. elif PipelineProcessorManager.instance().is_supported_runtime(pipeline_runtime):
  283. component_list = await PipelineProcessorManager.instance().get_components(pipeline_runtime)
  284. for component in component_list:
  285. supported_ops.append(component.op)
  286. # Checks pipeline node types are compatible with the runtime selected
  287. for sub_pipeline in pipeline_definition.pipelines:
  288. for node in sub_pipeline.nodes:
  289. if node.op not in ComponentCache.get_generic_component_ops() and pipeline_runtime == "local":
  290. response.add_message(
  291. severity=ValidationSeverity.Error,
  292. message_type="invalidNodeType",
  293. message="This pipeline contains at least one runtime-specific "
  294. "component, but pipeline runtime is 'local'. Specify a "
  295. "runtime config or remove runtime-specific components "
  296. "from the pipeline",
  297. data={"nodeID": node.id, "nodeOpName": node.op, "pipelineId": sub_pipeline.id},
  298. )
  299. break
  300. if node.type == "execution_node" and node.op not in supported_ops:
  301. response.add_message(
  302. severity=ValidationSeverity.Error,
  303. message_type="invalidNodeType",
  304. message="This component was not found in the catalog. Please add it "
  305. "to your component catalog or remove this node from the "
  306. "pipeline",
  307. data={
  308. "nodeID": node.id,
  309. "nodeOpName": node.op,
  310. "nodeName": node.label,
  311. "pipelineId": sub_pipeline.id,
  312. },
  313. )
  314. else:
  315. response.add_message(
  316. severity=ValidationSeverity.Error,
  317. message_type="invalidRuntime",
  318. message="Unsupported pipeline runtime",
  319. data={
  320. "pipelineRuntime": pipeline_runtime,
  321. "pipelineType": pipeline_type,
  322. "pipelineId": primary_pipeline_id,
  323. },
  324. )
  325. async def _validate_node_properties(
  326. self,
  327. pipeline_definition: PipelineDefinition,
  328. pipeline_type: str,
  329. pipeline_runtime: str,
  330. response: ValidationResponse,
  331. ) -> None:
  332. """
  333. Validates each of the node's structure for required fields/properties as well as
  334. their values
  335. :param pipeline_definition: the pipeline definition to be validated
  336. :param pipeline_type: name of the pipeline runtime being used e.g. kfp, airflow, generic
  337. :param pipeline_runtime: name of the pipeline runtime for execution e.g. kfp, airflow, local
  338. :param response: ValidationResponse containing the issue list to be updated
  339. """
  340. if pipeline_runtime:
  341. # don't check if incompatible pipeline type and runtime
  342. if not PipelineValidationManager._is_compatible_pipeline(pipeline_runtime, pipeline_type):
  343. return
  344. for pipeline in pipeline_definition.pipelines:
  345. for node in pipeline.nodes:
  346. if node.type == "execution_node":
  347. if Operation.is_generic_operation(node.op):
  348. self._validate_generic_node_properties(
  349. node=node, response=response, pipeline_runtime=pipeline_runtime
  350. )
  351. # Validate runtime components against specific node properties in component registry
  352. else:
  353. await self._validate_custom_component_node_properties(
  354. node=node,
  355. response=response,
  356. pipeline_runtime=pipeline_runtime,
  357. pipeline_definition=pipeline_definition,
  358. )
  359. def _validate_generic_node_properties(self, node: Node, response: ValidationResponse, pipeline_runtime: str):
  360. """
  361. Validate properties of a generic node
  362. :param node: the generic node to check
  363. :param response: the validation response object to attach any error messages
  364. :param pipeline_runtime: the pipeline runtime selected
  365. :return:
  366. """
  367. node_label = node.label
  368. image_name = node.get_component_parameter(RUNTIME_IMAGE)
  369. filename = node.get_component_parameter("filename")
  370. dependencies = node.get_component_parameter("dependencies")
  371. env_vars = node.get_component_parameter(ENV_VARIABLES)
  372. volumes = node.get_component_parameter(MOUNTED_VOLUMES)
  373. secrets = node.get_component_parameter(KUBERNETES_SECRETS)
  374. self._validate_filepath(
  375. node_id=node.id, node_label=node_label, property_name="filename", filename=filename, response=response
  376. )
  377. # If not running locally, we check resource and image name
  378. if pipeline_runtime != "local":
  379. self._validate_container_image_name(node.id, node_label, image_name, response=response)
  380. for resource_name in ["cpu", "gpu", "memory"]:
  381. resource_value = node.get_component_parameter(resource_name)
  382. if resource_value:
  383. self._validate_resource_value(
  384. node.id,
  385. node_label,
  386. resource_name=resource_name,
  387. resource_value=resource_value,
  388. response=response,
  389. )
  390. if volumes:
  391. self._validate_mounted_volumes(node.id, node_label, volumes, response=response)
  392. if secrets:
  393. self._validate_kubernetes_secrets(node.id, node_label, secrets, response=response)
  394. self._validate_label(node_id=node.id, node_label=node_label, response=response)
  395. if dependencies:
  396. notebook_root_relative_path = os.path.dirname(filename)
  397. for dependency in dependencies:
  398. self._validate_filepath(
  399. node_id=node.id,
  400. node_label=node_label,
  401. file_dir=os.path.join(self.root_dir, notebook_root_relative_path),
  402. property_name="dependencies",
  403. filename=dependency,
  404. response=response,
  405. )
  406. if env_vars:
  407. for env_var in env_vars:
  408. self._validate_environmental_variables(node.id, node_label, env_var=env_var, response=response)
  409. async def _validate_custom_component_node_properties(
  410. self, node: Node, response: ValidationResponse, pipeline_definition: PipelineDefinition, pipeline_runtime: str
  411. ):
  412. """
  413. Validates the properties of the custom component node
  414. :param node: the node to be validated
  415. :param response: the validation response object to attach any error messages
  416. :param pipeline_definition: the pipeline definition containing the node
  417. :param pipeline_runtime: the pipeline runtime selected
  418. :return:
  419. """
  420. component_list = await PipelineProcessorManager.instance().get_components(pipeline_runtime)
  421. components = ComponentCache.to_canvas_palette(component_list)
  422. # Full dict of properties for the operation e.g. current params, optionals etc
  423. component_property_dict = await self._get_component_properties(pipeline_runtime, components, node.op)
  424. # List of just the current parameters for the component
  425. current_parameter_defaults_list = list(
  426. map(lambda x: str(x).replace("elyra_", ""), component_property_dict["current_parameters"].keys())
  427. )
  428. # Remove the non component_parameter jinja templated values we do not check against
  429. current_parameter_defaults_list.remove("component_source")
  430. current_parameter_defaults_list.remove("label")
  431. for default_parameter in current_parameter_defaults_list:
  432. node_param = node.get_component_parameter(default_parameter)
  433. if self._is_required_property(component_property_dict, default_parameter):
  434. if not node_param:
  435. response.add_message(
  436. severity=ValidationSeverity.Error,
  437. message_type="invalidNodeProperty",
  438. message="Node is missing required property.",
  439. data={"nodeID": node.id, "nodeName": node.label, "propertyName": default_parameter},
  440. )
  441. elif self._get_component_type(component_property_dict, default_parameter) == "inputpath":
  442. # Any component property with type `InputPath` will be a dictionary of two keys
  443. # "value": the node ID of the parent node containing the output
  444. # "option": the name of the key (which is an output) of the above referenced node
  445. if (
  446. not isinstance(node_param, dict)
  447. or len(node_param) != 2
  448. or set(node_param.keys()) != {"value", "option"}
  449. ):
  450. response.add_message(
  451. severity=ValidationSeverity.Error,
  452. message_type="invalidNodeProperty",
  453. message="Node has malformed `InputPath` parameter structure",
  454. data={"nodeID": node.id, "nodeName": node.label},
  455. )
  456. node_ids = list(x.get("node_id_ref", None) for x in node.component_links)
  457. parent_list = self._get_parent_id_list(pipeline_definition, node_ids, [])
  458. node_param_value = node_param.get("value")
  459. if node_param_value not in parent_list:
  460. response.add_message(
  461. severity=ValidationSeverity.Error,
  462. message_type="invalidNodeProperty",
  463. message="Node contains an invalid inputpath reference. Please "
  464. "check your node-to-node connections",
  465. data={"nodeID": node.id, "nodeName": node.label},
  466. )
  467. elif isinstance(node_param, dict) and node_param.get("activeControl") == "NestedEnumControl":
  468. if not node_param.get("NestedEnumControl"):
  469. response.add_message(
  470. severity=ValidationSeverity.Error,
  471. message_type="invalidNodeProperty",
  472. message="Node contains an invalid reference to an node output. Please "
  473. "check the node properties are configured properly",
  474. data={"nodeID": node.id, "nodeName": node.label},
  475. )
  476. else:
  477. # TODO: Update this hardcoded check for xcom_push. This parameter is specific to a runtime
  478. # (Airflow). i.e. abstraction for byo validation?
  479. node_param_value = node_param["NestedEnumControl"].get("value")
  480. upstream_node = pipeline_definition.get_node(node_param_value)
  481. xcom_param = upstream_node.get_component_parameter("xcom_push")
  482. if xcom_param:
  483. xcom_value = xcom_param.get("BooleanControl")
  484. if not xcom_value:
  485. response.add_message(
  486. severity=ValidationSeverity.Error,
  487. message_type="invalidNodeProperty",
  488. message="Node contains an invalid input reference. The parent "
  489. "node does not have the xcom_push property enabled",
  490. data={
  491. "nodeID": node.id,
  492. "nodeName": node.label,
  493. "parentNodeID": upstream_node.label,
  494. },
  495. )
  496. def _validate_container_image_name(
  497. self, node_id: str, node_label: str, image_name: str, response: ValidationResponse
  498. ) -> None:
  499. """
  500. Validates the image name exists and is proper in syntax
  501. :param node_id: the unique ID of the node
  502. :param node_label: the given node name or user customized name/label of the node
  503. :param image_name: container image name to be evaluated
  504. :param response: ValidationResponse containing the issue list to be updated
  505. """
  506. if not image_name:
  507. response.add_message(
  508. severity=ValidationSeverity.Error,
  509. message_type="invalidNodeProperty",
  510. message="Required property value is missing.",
  511. data={"nodeID": node_id, "nodeName": node_label, "propertyName": "runtime_image"},
  512. )
  513. else:
  514. image_regex = re.compile(r"[^/ ]+/[^/ ]+$")
  515. matched = image_regex.search(image_name)
  516. if not matched:
  517. response.add_message(
  518. severity=ValidationSeverity.Error,
  519. message_type="invalidNodeProperty",
  520. message="Node contains an invalid runtime image. Runtime image "
  521. "must conform to the format [registry/]owner/image:tag",
  522. data={
  523. "nodeID": node_id,
  524. "nodeName": node_label,
  525. "propertyName": "runtime_image",
  526. "imageName": image_name,
  527. },
  528. )
  529. def _validate_resource_value(
  530. self, node_id: str, node_label: str, resource_name: str, resource_value: str, response: ValidationResponse
  531. ) -> None:
  532. """
  533. Validates the value for hardware resources requested
  534. :param node_id: the unique ID of the node
  535. :param node_label: the given node name or user customized name/label of the node
  536. :param resource_name: the name of the resource e.g. cpu, gpu. memory
  537. :param resource_value: the value of the resource
  538. :param response: ValidationResponse containing the issue list to be updated
  539. """
  540. try:
  541. if int(resource_value) <= 0:
  542. response.add_message(
  543. severity=ValidationSeverity.Error,
  544. message_type="invalidNodeProperty",
  545. message="Property must be greater than zero.",
  546. data={
  547. "nodeID": node_id,
  548. "nodeName": node_label,
  549. "propertyName": resource_name,
  550. "value": resource_value,
  551. },
  552. )
  553. except (ValueError, TypeError):
  554. response.add_message(
  555. severity=ValidationSeverity.Error,
  556. message_type="invalidNodeProperty",
  557. message="Property has a non-numeric value.",
  558. data={
  559. "nodeID": node_id,
  560. "nodeName": node_label,
  561. "propertyName": resource_name,
  562. "value": resource_value,
  563. },
  564. )
  565. def _validate_mounted_volumes(
  566. self, node_id: str, node_label: str, volumes: List[VolumeMount], response: ValidationResponse
  567. ) -> None:
  568. """
  569. Checks the format of mounted volumes to ensure they're in the correct form
  570. e.g. foo/path=pvc_name
  571. :param node_id: the unique ID of the node
  572. :param node_label: the given node name or user customized name/label of the node
  573. :param volumes: a KeyValueList of volumes to check
  574. :param response: ValidationResponse containing the issue list to be updated
  575. """
  576. for volume in volumes:
  577. # Ensure the PVC name is syntactically a valid Kubernetes resource name
  578. if not is_valid_kubernetes_resource_name(volume.pvc_name):
  579. response.add_message(
  580. severity=ValidationSeverity.Error,
  581. message_type="invalidVolumeMount",
  582. message=f"PVC name '{volume.pvc_name}' is not a valid Kubernetes resource name.",
  583. data={
  584. "nodeID": node_id,
  585. "nodeName": node_label,
  586. "propertyName": MOUNTED_VOLUMES,
  587. "value": KeyValueList.to_str(volume.path, volume.pvc_name),
  588. },
  589. )
  590. def _validate_kubernetes_secrets(
  591. self, node_id: str, node_label: str, secrets: List[KubernetesSecret], response: ValidationResponse
  592. ) -> None:
  593. """
  594. Checks the format of Kubernetes secrets to ensure they're in the correct form
  595. e.g. FOO=SECRET_NAME:KEY
  596. :param node_id: the unique ID of the node
  597. :param node_label: the given node name or user customized name/label of the node
  598. :param secrets: a KeyValueList of secrets to check
  599. :param response: ValidationResponse containing the issue list to be updated
  600. """
  601. for secret in secrets:
  602. if not secret.name or not secret.key:
  603. response.add_message(
  604. severity=ValidationSeverity.Error,
  605. message_type="invalidKubernetesSecret",
  606. message=f"Environment variable '{secret.env_var}' has an improperly formatted representation of "
  607. f"secret name and key.",
  608. data={
  609. "nodeID": node_id,
  610. "nodeName": node_label,
  611. "propertyName": KUBERNETES_SECRETS,
  612. "value": KeyValueList.to_str(secret.env_var, f"{(secret.name or '')}:{(secret.key or '')}"),
  613. },
  614. )
  615. continue
  616. # Ensure the secret name is syntactically a valid Kubernetes resource name
  617. if not is_valid_kubernetes_resource_name(secret.name):
  618. response.add_message(
  619. severity=ValidationSeverity.Error,
  620. message_type="invalidKubernetesSecret",
  621. message=f"Secret name '{secret.name}' is not a valid Kubernetes resource name.",
  622. data={
  623. "nodeID": node_id,
  624. "nodeName": node_label,
  625. "propertyName": KUBERNETES_SECRETS,
  626. "value": KeyValueList.to_str(secret.env_var, f"{secret.name}:{secret.key}"),
  627. },
  628. )
  629. # Ensure the secret key is a syntactically valid Kubernetes key
  630. if not is_valid_kubernetes_key(secret.key):
  631. response.add_message(
  632. severity=ValidationSeverity.Error,
  633. message_type="invalidKubernetesSecret",
  634. message=f"Key '{secret.key}' is not a valid Kubernetes secret key.",
  635. data={
  636. "nodeID": node_id,
  637. "nodeName": node_label,
  638. "propertyName": KUBERNETES_SECRETS,
  639. "value": KeyValueList.to_str(secret.env_var, f"{secret.name}:{secret.key}"),
  640. },
  641. )
  642. def _validate_filepath(
  643. self,
  644. node_id: str,
  645. node_label: str,
  646. property_name: str,
  647. filename: str,
  648. response: ValidationResponse,
  649. file_dir: Optional[str] = "",
  650. ) -> None:
  651. """
  652. Checks the file structure, paths and existence of pipeline dependencies.
  653. Note that this does not cross reference with file path references within the notebook or script itself.
  654. :param node_id: the unique ID of the node
  655. :param node_label: the given node name or user customized name/label of the node
  656. :param property_name: name of the node property being validated
  657. :param filename: the name of the file or directory to verify
  658. :param response: ValidationResponse containing the issue list to be updated
  659. :param file_dir: the dir path of the where the pipeline file resides in the elyra workspace
  660. """
  661. file_dir = file_dir or self.root_dir
  662. if filename == os.path.abspath(filename):
  663. normalized_path = os.path.normpath(filename)
  664. elif filename.startswith(file_dir):
  665. normalized_path = os.path.normpath(filename)
  666. else:
  667. normalized_path = os.path.normpath(f"{os.path.join(file_dir, filename)}")
  668. if not os.path.commonpath([normalized_path, self.root_dir]) == self.root_dir:
  669. response.add_message(
  670. severity=ValidationSeverity.Error,
  671. message_type="invalidFilePath",
  672. message="Property has an invalid reference to a file/dir outside the root workspace.",
  673. data={
  674. "nodeID": node_id,
  675. "nodeName": node_label,
  676. "propertyName": property_name,
  677. "value": normalized_path,
  678. },
  679. )
  680. elif "*" in normalized_path:
  681. if len(glob(normalized_path)) == 0:
  682. response.add_message(
  683. severity=ValidationSeverity.Error,
  684. message_type="invalidFilePath",
  685. message="Property(wildcard) has an invalid path to a file/dir" " or the file/dir does not exist.",
  686. data={
  687. "nodeID": node_id,
  688. "nodeName": node_label,
  689. "propertyName": property_name,
  690. "value": normalized_path,
  691. },
  692. )
  693. elif not os.path.exists(normalized_path):
  694. response.add_message(
  695. severity=ValidationSeverity.Error,
  696. message_type="invalidFilePath",
  697. message="Property has an invalid path to a file/dir or the file/dir does not exist.",
  698. data={
  699. "nodeID": node_id,
  700. "nodeName": node_label,
  701. "propertyName": property_name,
  702. "value": normalized_path,
  703. },
  704. )
  705. def _validate_environmental_variables(
  706. self, node_id: str, node_label: str, env_var: str, response: ValidationResponse
  707. ) -> None:
  708. """
  709. Checks the format of the env var to ensure its in the correct form
  710. e.g. FOO = 'BAR'
  711. :param node_id: the unique ID of the node
  712. :param node_label: the given node name or user customized name/label of the node
  713. :param env_var: the env_var key value pair to check
  714. :param response: ValidationResponse containing the issue list to be updated
  715. """
  716. result = [x.strip(" '\"") for x in env_var.split("=", 1)]
  717. if len(result) != 2:
  718. response.add_message(
  719. severity=ValidationSeverity.Error,
  720. message_type="invalidEnvPair",
  721. message="Property has an improperly formatted env variable key value pair.",
  722. data={"nodeID": node_id, "nodeName": node_label, "propertyName": ENV_VARIABLES, "value": env_var},
  723. )
  724. def _validate_label(self, node_id: str, node_label: str, response: ValidationResponse) -> None:
  725. """
  726. KFP specific check for the label name when constructing the node operation using dsl
  727. :param node_id: the unique ID of the node
  728. :param node_label: the given node name or user customized name/label of the node
  729. :param response: ValidationResponse containing the issue list to be updated
  730. """
  731. label_name_max_length = 63
  732. label_regex = re.compile("^[a-z0-9]([-_.a-z0-9]{0,62}[a-z0-9])?")
  733. matched = label_regex.search(node_label)
  734. if len(node_label) > label_name_max_length:
  735. response.add_message(
  736. severity=ValidationSeverity.Warning,
  737. message_type="invalidNodeLabel",
  738. message="Property value exceeds the max length allowed "
  739. "({label_name_max_length}). This value may be truncated "
  740. "by the runtime service.",
  741. data={"nodeID": node_id, "nodeName": node_label, "propertyName": "label", "value": node_label},
  742. )
  743. if not matched or matched.group(0) != node_label:
  744. response.add_message(
  745. severity=ValidationSeverity.Warning,
  746. message_type="invalidNodeLabel",
  747. message="The node label contains characters that may be replaced "
  748. "by the runtime service. Node labels should "
  749. "start with lower case alphanumeric and contain "
  750. "only lower case alphanumeric, underscores, dots, and dashes.",
  751. data={"nodeID": node_id, "nodeName": node_label, "propertyName": "label", "value": node_label},
  752. )
  753. def _validate_pipeline_graph(self, pipeline: dict, response: ValidationResponse) -> None:
  754. """
  755. Validates that the pipeline is an acyclic graph, meaning no circular references
  756. Converts the pipeline definition into a series of tuple node edges(arrows) that represent the connections
  757. from one node to another via a networkX DiGraph.
  758. Example:
  759. NodeC
  760. ^
  761. |
  762. NodeA -> NodeB -> NodeD
  763. ^ |
  764. | | (Invalid Circular Reference)
  765. <---------
  766. The resulting list of edges (arrows) would then be:
  767. [(NodeA, NodeB), (NodeB, NodeC), (NodeB, NodeD), (NodeD, NodeB)]
  768. the list of nodes added would be:
  769. [NodeA, NodeB, NodeC, NodeD]
  770. This function will add an error message for each cycle found and provide a list of LinkID(s)
  771. representing the cycle, in the example above, we would return a single error message with the LinkIDs
  772. for (NodeB, NodeD) and (NodeD, NodeB)
  773. :param response: ValidationResponse containing the issue list to be updated
  774. :param pipeline: A dictionary describing the pipeline
  775. """
  776. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  777. graph = nx.DiGraph()
  778. for single_pipeline in pipeline_json["pipelines"]:
  779. node_list = single_pipeline["nodes"]
  780. for node in node_list:
  781. if node["type"] == "execution_node":
  782. graph.add_node(node["id"])
  783. if node.get("inputs"):
  784. if "links" in node["inputs"][0]:
  785. for link in node["inputs"][0]["links"]:
  786. if "_outPort" in link["port_id_ref"]: # is ref to node, doesnt add links to supernodes
  787. graph.add_edge(link["port_id_ref"].strip("_outPort"), node["id"])
  788. elif link["port_id_ref"] == "outPort": # do not link to bindings
  789. graph.add_edge(link["node_id_ref"], node["id"])
  790. if node["type"] == "super_node":
  791. for link in node["inputs"][0]["links"]:
  792. child_node_id = node["inputs"][0]["id"].strip("_inPort")
  793. graph.add_edge(link["node_id_ref"], child_node_id)
  794. for isolate in nx.isolates(graph):
  795. if graph.number_of_nodes() > 1:
  796. response.add_message(
  797. severity=ValidationSeverity.Warning,
  798. message_type="singletonReference",
  799. message="Node is not connected to any other node.",
  800. data={
  801. "nodeID": isolate,
  802. "nodeName": self._get_node_names(pipeline=pipeline, node_id_list=[isolate])[0],
  803. "pipelineID": self._get_pipeline_id(pipeline, node_id=isolate),
  804. },
  805. )
  806. cycles_detected = nx.simple_cycles(graph)
  807. if len(list(cycles_detected)) > 0:
  808. response.add_message(
  809. severity=ValidationSeverity.Error,
  810. message_type="circularReference",
  811. message="The pipeline contains a circular dependency between nodes.",
  812. data={},
  813. )
  814. def _get_pipeline_id(self, pipeline: dict, node_id: str) -> Optional[str]:
  815. """
  816. Given a node ID, returns the pipeline ID of where the node is currently connected to
  817. :param pipeline: pipeline definition where the link is located
  818. :param node_id: the node ID of the node
  819. :return: the pipeline ID of where the node is located
  820. """
  821. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  822. for single_pipeline in pipeline_json["pipelines"]:
  823. node_list = single_pipeline["nodes"]
  824. for node in node_list:
  825. if node["id"] == node_id:
  826. return single_pipeline["id"]
  827. return None
  828. async def _get_component_properties(self, pipeline_runtime: str, components: dict, node_op: str) -> Dict:
  829. """
  830. Retrieve the full dict of properties associated with the node_op
  831. :param components: list of components associated with the pipeline runtime being used e.g. kfp, airflow
  832. :param node_op: the node operation e.g. execute-notebook-node
  833. :return: a list of property names associated with the node op
  834. """
  835. if node_op == "execute-notebook-node":
  836. node_op = "notebooks"
  837. elif node_op == "execute-r-node":
  838. node_op = "r-script"
  839. elif node_op == "execute-python-node":
  840. node_op = "python-script"
  841. for category in components["categories"]:
  842. for node_type in category["node_types"]:
  843. if node_op == node_type["op"]:
  844. component: Component = await PipelineProcessorManager.instance().get_component(
  845. pipeline_runtime, node_op
  846. )
  847. component_properties = ComponentCache.to_canvas_properties(component)
  848. return component_properties
  849. return {}
  850. def _get_node_names(self, pipeline: dict, node_id_list: list) -> List:
  851. """
  852. Given a node_id_list, will return the node's name for each node_id in the list, respectively
  853. :param pipeline: pipeline definition where the node is located
  854. :param node_id_list: a list of UUIDs defined in the pipeline file
  855. :return: a string representing the name of the node
  856. """
  857. node_name_list = []
  858. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  859. for node_id in node_id_list:
  860. found = False
  861. for single_pipeline in pipeline_json["pipelines"]:
  862. for node in single_pipeline["nodes"]:
  863. if node["id"] == node_id:
  864. node_name_list.append(self._get_node_label(node))
  865. found = True
  866. break
  867. if found:
  868. break
  869. return node_name_list
  870. def _get_node_labels(self, pipeline: dict, link_ids: List[str]) -> Optional[List[str]]:
  871. """
  872. Returns the names (labels) of the nodes that are connected by
  873. the specified link_ids.
  874. :param pipeline: the pipeline dict
  875. :param link_ids: list of link ids from pipeline
  876. :return a tuple containing two node labels that are connected
  877. """
  878. if link_ids is None:
  879. return None
  880. pipeline_json = json.loads(json.dumps(pipeline, cls=DataClassJSONEncoder))
  881. node_labels = []
  882. for link_id in link_ids:
  883. for single_pipeline in pipeline_json["pipelines"]:
  884. for node in single_pipeline["nodes"]:
  885. if node["type"] == "execution_node":
  886. for input in node.get("inputs", []):
  887. for link in input.get("links", []):
  888. if link["id"] == link_id:
  889. node_labels.append(self._get_node_label(node))
  890. return node_labels
  891. def _get_node_label(self, node: dict) -> Optional[str]:
  892. """
  893. Returns the label for the provided node or None if the information
  894. cannot be derived from the inpuit dictionary.
  895. :param node: a dict representing a pipeline node
  896. :return: the label of the node
  897. """
  898. if node is None or node.get("app_data") is None:
  899. return None
  900. node_label = node["app_data"].get("label")
  901. if node["type"] == "execution_node" and node["app_data"].get("ui_data"):
  902. node_label = node["app_data"]["ui_data"].get("label")
  903. return node_label
  904. def _is_legacy_pipeline(self, pipeline: dict) -> bool:
  905. """
  906. Checks the pipeline to determine if the pipeline is an older legacy schema
  907. :param pipeline: the pipeline dict
  908. :return:
  909. """
  910. return pipeline["pipelines"][0]["app_data"].get("properties") is None
  911. def _is_required_property(self, property_dict: dict, node_property: str) -> bool:
  912. """
  913. Determine whether or not a component parameter is required to function correctly
  914. :param property_dict: the dictionary for the component
  915. :param node_property: the component property to check
  916. :return:
  917. """
  918. node_op_parameter_list = property_dict["uihints"]["parameter_info"]
  919. for parameter in node_op_parameter_list:
  920. if parameter["parameter_ref"] == f"elyra_{node_property}":
  921. return parameter["data"]["required"]
  922. return False
  923. def _get_component_type(self, property_dict: dict, node_property: str, control_id: str = "") -> Optional[str]:
  924. """
  925. Helper function to determine the type of a node property
  926. :param property_dict: a dictionary containing the full list of property parameters and descriptions
  927. :param node_property: the property to look for
  928. :param control_id: when using OneOfControl, include the control_id to retrieve the correct format
  929. :return: the data type associated with node_property, defaults to 'string'
  930. """
  931. for prop in property_dict["uihints"]["parameter_info"]:
  932. if prop["parameter_ref"] == f"elyra_{node_property}":
  933. if control_id:
  934. return prop["data"]["controls"][control_id].get("format", "string")
  935. else:
  936. return prop["data"].get("format", "string")
  937. return None
  938. def _get_parent_id_list(
  939. self, pipeline_definition: PipelineDefinition, node_id_list: list, parent_list: list
  940. ) -> List:
  941. """
  942. Helper function to return a complete list of parent node_ids
  943. :param pipeline_definition: the complete pipeline definition
  944. :param node_id_list: list of parent node ids
  945. :param parent_list: the list to add additional found parent node ids
  946. :return:
  947. """
  948. for node_id in node_id_list:
  949. node = pipeline_definition.get_node(node_id)
  950. if node:
  951. if node.type in ["execution_node", "super_node"]:
  952. parent_list.append(node_id)
  953. node_ids = list(x.get("node_id_ref", None) for x in node.component_links)
  954. for nid in node_ids: # look-ahead to determine if node is a binding node
  955. if pipeline_definition.get_node(nid).type == "binding":
  956. node_ids.remove(nid)
  957. for super_node in pipeline_definition.get_supernodes():
  958. if super_node.subflow_pipeline_id == nid:
  959. links = list(x.get("node_id_ref", None) for x in super_node.component_links)
  960. node_ids.append(links)
  961. self._get_parent_id_list(pipeline_definition, node_ids, parent_list)
  962. else: # binding node
  963. pass
  964. return parent_list