kfp_metadata.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from typing import Any
  17. from elyra.metadata.manager import MetadataManager
  18. from elyra.pipeline.kfp.kfp_authentication import SupportedAuthProviders
  19. from elyra.pipeline.runtimes_metadata import RuntimesMetadata
  20. class KfpMetadata(RuntimesMetadata):
  21. """
  22. Applies changes specific to the kfp schema
  23. """
  24. def on_load(self, **kwargs: Any) -> None:
  25. super().on_load(**kwargs)
  26. if self.metadata.get("auth_type") is None:
  27. # Inject auth_type property for metadata persisted using Elyra < 3.3:
  28. # - api_username and api_password present -> use DEX Legacy
  29. # - otherwise -> use no authentication type
  30. if (
  31. len(self.metadata.get("api_username", "").strip()) == 0
  32. or len(self.metadata.get("api_password", "").strip()) == 0
  33. ):
  34. self.metadata["auth_type"] = SupportedAuthProviders.NO_AUTHENTICATION.name
  35. else:
  36. self.metadata["auth_type"] = SupportedAuthProviders.DEX_LEGACY.name
  37. if self.metadata.get("cos_auth_type") is None:
  38. # Inject cos_auth_type property for metadata persisted using Elyra < 3.4:
  39. # - cos_username and cos_password must be present
  40. # - cos_secret may be present (above statement also applies in this case)
  41. if self.metadata.get("cos_username") and self.metadata.get("cos_password"):
  42. if len(self.metadata.get("cos_secret", "").strip()) == 0:
  43. self.metadata["cos_auth_type"] = "USER_CREDENTIALS"
  44. else:
  45. self.metadata["cos_auth_type"] = "KUBERNETES_SECRET"
  46. # save changes
  47. MetadataManager(schemaspace="runtimes").update(self.name, self, for_migration=True)
  48. return None
  49. def pre_save(self, **kwargs: Any) -> None:
  50. """
  51. This method enforces conditional constraints related to
  52. KFP and COS authentication type properties.
  53. TODO: Remove after https://github.com/elyra-ai/elyra/issues/2338
  54. was resolved.
  55. """
  56. super().pre_save(**kwargs)
  57. # validation of Kubeflow authentication constraints
  58. if self.metadata.get("auth_type") is not None:
  59. try:
  60. kfp_auth_provider = SupportedAuthProviders.get_instance_by_name(self.metadata["auth_type"])
  61. except Exception:
  62. kfp_auth_provider = None
  63. if kfp_auth_provider == SupportedAuthProviders.NO_AUTHENTICATION:
  64. if (
  65. len(self.metadata.get("api_username", "").strip()) > 0
  66. or len(self.metadata.get("api_password", "").strip()) > 0
  67. ):
  68. raise ValueError(
  69. "Username and password are not supported " "for the selected Kubeflow authentication type."
  70. )
  71. elif kfp_auth_provider == SupportedAuthProviders.DEX_STATIC_PASSWORDS:
  72. if (
  73. len(self.metadata.get("api_username", "").strip()) == 0
  74. or len(self.metadata.get("api_password", "").strip()) == 0
  75. ):
  76. raise ValueError(
  77. "A username and password are required " "for the selected Kubeflow authentication type."
  78. )
  79. elif kfp_auth_provider == SupportedAuthProviders.DEX_LDAP:
  80. if (
  81. len(self.metadata.get("api_username", "").strip()) == 0
  82. or len(self.metadata.get("api_password", "").strip()) == 0
  83. ):
  84. raise ValueError(
  85. "A username and password are required " "for the selected Kubeflow authentication type."
  86. )
  87. elif kfp_auth_provider == SupportedAuthProviders.DEX_LEGACY:
  88. is_empty_api_username = len(self.metadata.get("api_username", "").strip()) == 0
  89. is_empty_api_password = len(self.metadata.get("api_password", "").strip()) == 0
  90. if is_empty_api_username is False and is_empty_api_password:
  91. raise ValueError("A username requires a password " "for the selected Kubeflow authentication type.")
  92. if is_empty_api_password is False and is_empty_api_username:
  93. raise ValueError("A password requires a username " "for the selected Kubeflow authentication type.")
  94. elif kfp_auth_provider == SupportedAuthProviders.KUBERNETES_SERVICE_ACCOUNT_TOKEN:
  95. if (
  96. len(self.metadata.get("api_username", "").strip()) > 0
  97. or len(self.metadata.get("api_password", "").strip()) > 0
  98. ):
  99. raise ValueError(
  100. "Username and password are not supported " "for the selected Kubeflow authentication type."
  101. )
  102. # validation of Object Storage authentication constraints
  103. if self.metadata.get("cos_auth_type") is None:
  104. # nothing to do
  105. return
  106. if self.metadata["cos_auth_type"] == "USER_CREDENTIALS":
  107. if (
  108. len(self.metadata.get("cos_username", "").strip()) == 0
  109. or len(self.metadata.get("cos_password", "").strip()) == 0
  110. ):
  111. raise ValueError(
  112. "A username and password are required " "for the selected Object Storage authentication type."
  113. )
  114. if len(self.metadata.get("cos_secret", "").strip()) > 0:
  115. raise ValueError(
  116. "Kubernetes secrets are not supported " "for the selected Object Storage authentication type."
  117. )
  118. elif self.metadata["cos_auth_type"] == "KUBERNETES_SECRET":
  119. if (
  120. len(self.metadata.get("cos_username", "").strip()) == 0
  121. or len(self.metadata.get("cos_password", "").strip()) == 0
  122. or len(self.metadata.get("cos_secret", "").strip()) == 0
  123. ):
  124. raise ValueError(
  125. "Username, password, and Kubernetes secret are required "
  126. "for the selected Object Storage authentication type."
  127. )
  128. elif self.metadata["cos_auth_type"] == "AWS_IAM_ROLES_FOR_SERVICE_ACCOUNTS":
  129. if (
  130. len(self.metadata.get("cos_username", "").strip()) > 0
  131. or len(self.metadata.get("cos_password", "").strip()) > 0
  132. or len(self.metadata.get("cos_secret", "").strip()) > 0
  133. ):
  134. raise ValueError(
  135. "Username, password, and Kubernetes secret are not supported "
  136. "for the selected Object Storage authentication type."
  137. )