component_parser_kfp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from types import SimpleNamespace
  17. from typing import Any
  18. from typing import Dict
  19. from typing import List
  20. from typing import Optional
  21. from jsonschema import validate
  22. from jsonschema import ValidationError
  23. import yaml
  24. from elyra.pipeline.catalog_connector import CatalogEntry
  25. from elyra.pipeline.component import Component
  26. from elyra.pipeline.component import ComponentParameter
  27. from elyra.pipeline.component import ComponentParser
  28. from elyra.pipeline.component import ControllerMap
  29. from elyra.pipeline.kfp.kfp_component_utils import component_yaml_schema
  30. from elyra.pipeline.runtime_type import RuntimeProcessorType
  31. class KfpComponentParser(ComponentParser):
  32. _file_types: List[str] = [".yaml"]
  33. component_platform: RuntimeProcessorType = RuntimeProcessorType.KUBEFLOW_PIPELINES
  34. def parse(self, catalog_entry: CatalogEntry) -> Optional[List[Component]]:
  35. # Get YAML object from component definition
  36. component_yaml = self._read_component_yaml(catalog_entry)
  37. if not component_yaml:
  38. return None
  39. # Assign component_id and description
  40. description = ""
  41. if component_yaml.get("description"):
  42. # Remove whitespace characters and replace with spaces
  43. description = " ".join(component_yaml.get("description").split())
  44. component_properties = self._parse_properties(component_yaml)
  45. component = catalog_entry.get_component(
  46. id=catalog_entry.id,
  47. name=component_yaml.get("name"),
  48. description=description,
  49. properties=component_properties,
  50. file_extension=self._file_types[0],
  51. )
  52. return [component]
  53. def _parse_properties(self, component_yaml: Dict[str, Any]) -> List[ComponentParameter]:
  54. properties: List[ComponentParameter] = []
  55. # NOTE: Currently no runtime-specific properties are needed
  56. # properties.extend(self.get_runtime_specific_properties())
  57. # Then loop through and create custom properties
  58. # Get parameter sub-dictionaries from YAML object
  59. input_params = component_yaml.get("inputs", [])
  60. output_params = component_yaml.get("outputs", [])
  61. all_params = {"inputs": input_params, "outputs": output_params}
  62. # Loop through inputs and outputs and create custom properties
  63. for param_type, params in all_params.items():
  64. for param in params:
  65. # KFP components default to being required unless otherwise stated.
  66. # Reference: https://www.kubeflow.org/docs/components/pipelines/reference/component-spec/#interface
  67. required = True
  68. if param.get("optional") is True:
  69. required = False
  70. # Assign parsed data type (default to string)
  71. data_type_parsed = param.get("type", "string")
  72. # # define adjusted type as either inputPath or outputPath
  73. data_type_adjusted = data_type_parsed
  74. if self._is_path_based_parameter(param.get("name"), component_yaml):
  75. data_type_adjusted = f"{param_type[:-1]}Path"
  76. data_type_info = self.determine_type_information(data_type_adjusted)
  77. if data_type_info.undetermined:
  78. self.log.debug(
  79. f"Data type from parsed data ('{data_type_parsed}') could not be determined. "
  80. f"Proceeding as if 'string' was detected."
  81. )
  82. if not data_type_info.required:
  83. required = data_type_info.required
  84. # Get value if provided
  85. raw_value = param.get("default", "")
  86. # Adjust any double quoted default values to use single quotes to avoid json parsing errors
  87. value = raw_value.replace('"', "'")
  88. # Set parameter ref (id) and display name
  89. ref_name = param.get("name").lower().replace(" ", "_")
  90. display_name = param.get("name")
  91. description = param.get("description", "")
  92. if data_type_info.data_type != "inputpath":
  93. # Add parsed data type hint to description in parenthesis
  94. description = self._format_description(description=description, data_type=data_type_parsed)
  95. if data_type_info.data_type == "outputpath":
  96. ref_name = f"output_{ref_name}"
  97. one_of_control_types = data_type_info.one_of_control_types
  98. default_control_type = data_type_info.control_id
  99. if data_type_info.data_type == "inputvalue":
  100. data_type_info.control_id = "OneOfControl"
  101. one_of_control_types = [
  102. (
  103. default_control_type,
  104. data_type_info.default_data_type,
  105. ControllerMap[default_control_type].value,
  106. ),
  107. ("NestedEnumControl", "inputpath", ControllerMap["NestedEnumControl"].value),
  108. ]
  109. component_params = ComponentParameter(
  110. id=ref_name,
  111. name=display_name,
  112. data_type=data_type_info.data_type,
  113. default_data_type=data_type_info.default_data_type,
  114. value=(value or data_type_info.default_value),
  115. description=description,
  116. control=data_type_info.control,
  117. control_id=data_type_info.control_id,
  118. one_of_control_types=one_of_control_types,
  119. default_control_type=default_control_type,
  120. required=required,
  121. )
  122. properties.append(component_params)
  123. return properties
  124. def get_runtime_specific_properties(self) -> List[ComponentParameter]:
  125. """
  126. Define properties that are common to the KFP runtime.
  127. """
  128. return [
  129. ComponentParameter(
  130. id="runtime_image",
  131. name="Runtime Image",
  132. data_type="string",
  133. value="",
  134. description="Container image used as execution environment.",
  135. control="readonly",
  136. required=True,
  137. )
  138. ]
  139. def _read_component_yaml(self, catalog_entry: CatalogEntry) -> Optional[Dict[str, Any]]:
  140. """
  141. Convert component_definition string to YAML object
  142. """
  143. try:
  144. results = yaml.safe_load(catalog_entry.entry_data.definition)
  145. except Exception as e:
  146. self.log.warning(
  147. f"Could not load YAML definition for component with identifying information: "
  148. f"'{catalog_entry.entry_reference}' -> {str(e)}"
  149. )
  150. return None
  151. try:
  152. # Validate against component YAML schema
  153. validate(instance=results, schema=component_yaml_schema)
  154. # If the component definition does not define a container command, log a warning.
  155. # See https://www.kubeflow.org/docs/components/pipelines/installation/choose-executor/#emissary-executor
  156. if results.get("implementation", {}).get("container", {}).get("command") is None:
  157. self.log.warning(
  158. f"Component '{results['name']}' does not define a container command. "
  159. "It might fail execution on Kubeflow Pipelines installations that are "
  160. "configured to use Argo as workflow engine and emissary "
  161. "executor as workflow executor."
  162. )
  163. except ValidationError as ve:
  164. self.log.warning(
  165. f"Invalid format of YAML definition for component with identifying information: "
  166. f"'{catalog_entry.entry_reference}' -> {str(ve)}"
  167. )
  168. return None
  169. return results
  170. def _is_path_based_parameter(self, parameter_name: str, component_body: Dict[str, Any]) -> bool:
  171. """
  172. Check whether parameter is a KFP path parameter (as opposed to a value parameter)
  173. :param parameter_name: the name of the parameter that will be checked
  174. :param component_body: the component YAML contents
  175. """
  176. # Get component_body['implementation']['container'] sub-dictionary if it exists
  177. component_impl = component_body.get("implementation", {}).get("container", {})
  178. # Get list of component commands/arguments
  179. commands_and_args = component_impl.get("command", []) + component_impl.get("args", [])
  180. # Loop through dictionary-types only; parameter-based fields will
  181. # always be of the format {'inputPath': 'parameter name'}
  182. for param_dict in [c for c in commands_and_args if isinstance(c, dict)]:
  183. # Check the (single-element) list of values for a
  184. # match on the full parameter name given
  185. if parameter_name in list(param_dict.values()):
  186. # Check whether the first (and only) key contains the
  187. # phrase "Path", e.g. inputPath or outputPath
  188. if "Path" in list(param_dict.keys())[0]:
  189. return True
  190. # Otherwise, assume inputValue for this parameter name
  191. # and do not proceed to check other parameter dicts
  192. break
  193. return False
  194. def determine_type_information(self, parsed_type: str) -> SimpleNamespace:
  195. """
  196. Takes the type information of a component parameter as parsed from the component
  197. specification and returns a new type that is one of several standard options.
  198. """
  199. data_type_info = super().determine_type_information(parsed_type)
  200. # By default, original data type(determined by parent) is stored as the `default_data_type`
  201. # and then overridden with Kubeflow Pipeline's meta-type, in this case, all values are
  202. # considered as `inputValues` unless the parent method is unable to determine the
  203. # type e.g. kfp path-based types
  204. data_type_info.default_data_type = data_type_info.data_type
  205. data_type_info.data_type = "inputvalue"
  206. if data_type_info.undetermined:
  207. if "inputpath" in data_type_info.parsed_data:
  208. data_type_info.data_type = "inputpath"
  209. data_type_info.control_id = "NestedEnumControl"
  210. data_type_info.undetermined = False
  211. data_type_info.default_value = None
  212. elif "outputpath" in data_type_info.parsed_data:
  213. data_type_info.data_type = "outputpath"
  214. data_type_info.required = False
  215. data_type_info.control = "readonly"
  216. data_type_info.undetermined = False
  217. return data_type_info