kfp_authentication.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. from enum import Enum
  19. from http import HTTPStatus
  20. import os
  21. import re
  22. from typing import Any
  23. from typing import Dict
  24. from typing import List
  25. from typing import Optional
  26. from typing import Tuple
  27. from urllib.parse import urlsplit
  28. from kfp.auth import KF_PIPELINES_SA_TOKEN_ENV
  29. from kfp.auth import KF_PIPELINES_SA_TOKEN_PATH
  30. from kfp.auth import ServiceAccountTokenVolumeCredentials
  31. import requests
  32. def _empty_or_whitespaces_only(a_string: str) -> bool:
  33. """
  34. Utility function: evaluates whether a_string is None or contains
  35. only whitespaces.
  36. :param a_string: string to be evaluated
  37. :type: str
  38. :return: True if a_string is None or contains only whitespaces
  39. :rtype: Boolean
  40. """
  41. if a_string is None or len(a_string.strip()) == 0:
  42. return True
  43. return False
  44. class SupportedAuthProviders(Enum):
  45. """
  46. List of supported authentication providers that is defined
  47. in this module. Each entry in this list must be associated
  48. with an implementation of AbstractAuthenticator.
  49. """
  50. # KF is not secured
  51. # (See NoAuthenticationAuthenticator)
  52. NO_AUTHENTICATION = "No authentication"
  53. # KF is secured using KUBERNETES_SERVICE_ACCOUNT_TOKEN
  54. # (See K8sServiceAccountTokenAuthenticator implementation)
  55. KUBERNETES_SERVICE_ACCOUNT_TOKEN = "Kubernetes service account token"
  56. # KF is secured using DEX with static id/password
  57. # (See StaticPasswordKFPAuthenticator implementation)
  58. DEX_STATIC_PASSWORDS = "DEX (static passwords)"
  59. # Supports DEX with LDAP authentication
  60. # (See DEXLDAPAuthenticator implementation)
  61. DEX_LDAP = "DEX (LDAP)"
  62. # Supports multiple authentication mechanisms
  63. # (See DEXLegacyAuthenticator implementation)
  64. DEX_LEGACY = "DEX (legacy)"
  65. @staticmethod
  66. def get_default_provider() -> "SupportedAuthProviders":
  67. """
  68. Returns the "default" enum member (provider)
  69. :return: default enum member
  70. :rtype: str
  71. """
  72. return SupportedAuthProviders.NO_AUTHENTICATION
  73. @staticmethod
  74. def get_provider_names() -> List[str]:
  75. """
  76. Returns all enum member (provider) names
  77. :return: List of provider names
  78. :rtype: List[str]
  79. """
  80. return list(map(lambda c: c.name, SupportedAuthProviders))
  81. @staticmethod
  82. def get_instance_by_name(name: str) -> "SupportedAuthProviders":
  83. """
  84. Returns an enumeration member of SupportedAuthProviders
  85. corresponding to the given name.
  86. :raises ValueError: name is not a valid enum member name
  87. :return: An enum member of SupportedAuthProviders
  88. :rtype: SupportedAuthProviders
  89. """
  90. try:
  91. return SupportedAuthProviders[name]
  92. except KeyError:
  93. raise ValueError(f"'{name}' is not a valid {SupportedAuthProviders.__name__}")
  94. @staticmethod
  95. def get_instance_by_value(value: str) -> "SupportedAuthProviders":
  96. """
  97. Returns an enumeration member of SupportedAuthProviders
  98. corresponding to the given value.
  99. :raises ValueError: value is not a valid enum member value
  100. :return: An enum member of SupportedAuthProviders
  101. :rtype: SupportedAuthProviders
  102. """
  103. return SupportedAuthProviders(value)
  104. @staticmethod
  105. def to_dict() -> Dict:
  106. """
  107. Convert the enum into a dictionary. Keys are the member
  108. names (internal authentication type id) and values are
  109. the associated user-friendly member values.
  110. :return: dictionary, comprising all members of the enum
  111. :rtype: Dict
  112. """
  113. enum_member_dict = {}
  114. for member in SupportedAuthProviders:
  115. enum_member_dict[member.name] = member.value
  116. return enum_member_dict
  117. class AuthenticationError(Exception):
  118. """
  119. Indicates that an error occurred while an authentication request
  120. was being processed.
  121. """
  122. def __init__(
  123. self,
  124. message: str,
  125. provider: Optional[SupportedAuthProviders] = None,
  126. request_history: Optional[List[Tuple[str, requests.Response]]] = None,
  127. ):
  128. """
  129. Create a new AuthenticationError exception. The throw-er should
  130. populate the request_history to allow for troubleshooting. List entry key is the (HTTP)
  131. request URL, the value the response object.
  132. :param message: a user friendly error message
  133. :type message: str
  134. :param provider: if the error is raised by an implementation of AbstractAuthenticator,
  135. use the value of _type; optional, defaults to None
  136. :type provider: Optional[SupportedAuthProviders], optional
  137. :param request_history: , defaults to None
  138. :type request_history: Optional[List[Dict[str, requests.Response]]], optional
  139. """
  140. self._message = message
  141. self._provider = provider
  142. self._request_history = request_history
  143. def get_request_history(self) -> Optional[List[Tuple[str, requests.Response]]]:
  144. """
  145. Returns the HTTP request history that led to this exception.
  146. :return: A list of tuples, comprising HTTP URL and the response object
  147. :rtype: Optional[List[Tuple[str, requests.Response]]]
  148. """
  149. return self._request_history
  150. def request_history_to_string(self) -> Optional[str]:
  151. """
  152. Dump key HTTP request history into a string for logging purposes
  153. :return: Formatted HTTP request history, which led to the failure.
  154. :rtype: Optional[str]
  155. """
  156. output = None
  157. for request_entry in self._request_history or []:
  158. if output is None:
  159. output = (
  160. f"Request URL: {request_entry[0]} "
  161. f"HTTP status code: {request_entry[1].status_code} "
  162. f"response URL: {request_entry[1].url}"
  163. )
  164. else:
  165. output = (
  166. f"{output}\n"
  167. f"Request URL: {request_entry[0]} "
  168. f"HTTP status code: {request_entry[1].status_code} "
  169. f"response URL: {request_entry[1].url}"
  170. )
  171. return output
  172. class KFPAuthenticator:
  173. """
  174. Use this class to authenticate with Kubeflow Pipelines. The authenticate
  175. method delegates the actual authentication to an implementation of the
  176. AbstractAuthenticator class.
  177. """
  178. def authenticate(
  179. self,
  180. api_endpoint: str,
  181. auth_type_str: str,
  182. runtime_config_name: str,
  183. auth_parm_1: Optional[str] = None,
  184. auth_parm_2: Optional[str] = None,
  185. ) -> Dict[str, Any]:
  186. """
  187. Try to authenticate with Kubeflow using the provided information.
  188. :param api_endpoint: Kubeflow Pipelines endpoint URL, as specified in the runtime configuration
  189. :type api_endpoint: str
  190. :param auth_type_str Identifies the authentication type to be performed. If the provided value
  191. is in the SupportedAuthProviders enum, authentication is performed.
  192. :type auth_type_str: str
  193. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified.
  194. :type runtime_config_name: str
  195. :param auth_parm_1: First authorization parameter from the runtime config, defaults to None
  196. :type auth_parm_1: Optional[str], optional
  197. :param auth_parm_2: Second authorization parameter from the runtime config, defaults to None
  198. :type auth_parm_2: Optional[str], optional
  199. :raises AuthenticationError: Authentication failed due to the provided reason.
  200. :return: A data structure containing information that enables kfp.Client to connect to api_endpoint
  201. :rtype: Dict[str, str]
  202. """
  203. kf_url = urlsplit(api_endpoint)._replace(path="").geturl()
  204. # return data structure for successful requests
  205. auth_info = {
  206. "api_endpoint": kf_url, # KF API endpoint, source: runtime config
  207. "auth_type": None, # Authentication type, source: runtime config
  208. "kf_secured": False, # Indicates whether KF API is secured
  209. "cookies": None, # passed to KFP SDK client as "cookies" param value
  210. "credentials": None, # passed to KFP SDK client as "credentials" param value
  211. "existing_token": None, # passed to KFP SDK client as "existing_token" param value
  212. }
  213. try:
  214. auth_type = SupportedAuthProviders.get_instance_by_name(auth_type_str)
  215. auth_info["auth_type"] = auth_type.value
  216. except ValueError:
  217. # the provided authentication type is not supported
  218. raise AuthenticationError(
  219. f"Authentication type '{auth_type_str}' is not supported. "
  220. f"Update runtime configuration '{runtime_config_name}' and try again."
  221. )
  222. try:
  223. # Process the authentication request using the appropriate authenticator
  224. # implementation. Refer to the class definitions for information how
  225. # the request is processed
  226. if auth_type == SupportedAuthProviders.NO_AUTHENTICATION:
  227. # No authentication is performed. The authenticator returns None
  228. NoAuthenticationAuthenticator().authenticate(kf_url, runtime_config_name)
  229. elif auth_type == SupportedAuthProviders.DEX_STATIC_PASSWORDS:
  230. # static id/password checking; the authenticator returns
  231. # a cookie value
  232. auth_info["cookies"] = DEXStaticPasswordAuthenticator().authenticate(
  233. kf_url, runtime_config_name, username=auth_parm_1, password=auth_parm_2
  234. )
  235. auth_info["kf_secured"] = True
  236. elif auth_type == SupportedAuthProviders.DEX_LEGACY:
  237. # see implementation for details; the authenticator returns
  238. # a cookie value
  239. auth_info["cookies"] = DEXLegacyAuthenticator().authenticate(
  240. kf_url, runtime_config_name, username=auth_parm_1, password=auth_parm_2
  241. )
  242. if auth_info.get("cookies") is not None:
  243. auth_info["kf_secured"] = True
  244. elif auth_type == SupportedAuthProviders.DEX_LDAP:
  245. # DEX/LDAP authentication; the authenticator returns
  246. # a cookie value
  247. auth_info["cookies"] = DEXLDAPAuthenticator().authenticate(
  248. kf_url, runtime_config_name, username=auth_parm_1, password=auth_parm_2
  249. )
  250. if auth_info.get("cookies") is not None:
  251. auth_info["kf_secured"] = True
  252. elif auth_type == SupportedAuthProviders.KUBERNETES_SERVICE_ACCOUNT_TOKEN:
  253. # see implementation for details; the authenticator returns
  254. # a ServiceAccountTokenVolumeCredentials
  255. auth_info["credentials"] = K8sServiceAccountTokenAuthenticator().authenticate(
  256. kf_url, runtime_config_name
  257. )
  258. auth_info["kf_secured"] = True
  259. else:
  260. # SupportedAuthProviders contains a member that is not yet
  261. # associated with an implementation of AbstractAuthenticator
  262. raise AuthenticationError(f"Support for authentication type '{auth_type.name}' is not implemented.")
  263. except AuthenticationError:
  264. raise
  265. except Exception as ex:
  266. raise AuthenticationError(
  267. f"Authentication using authentication type " f'\'{auth_info["auth_type"]}\' failed: {ex}'
  268. )
  269. # sanity check: upon completion auth_info must not contain
  270. # incomplete or conflicting information
  271. if auth_info.get("auth_type") is None or (
  272. auth_info.get("cookies") is not None and auth_info.get("existing_token") is not None
  273. ):
  274. raise AuthenticationError(
  275. "A potential authentication implementation problem was detected. " "Please create an issue."
  276. )
  277. return auth_info
  278. class AbstractAuthenticator(ABC):
  279. """
  280. Abstract base class for authenticator implementations
  281. """
  282. _type = None # unique authenticator id
  283. @abstractmethod
  284. def authenticate(self, kf_endpoint: str, runtime_config_name: str) -> Optional[SupportedAuthProviders]:
  285. """
  286. Attempt to authenticate with the specified Kubeflow endpoint. The caller
  287. expects the implementing method to behave as follows:
  288. - if authentication fails (for any reason), AuthenticationError is raised
  289. - an entity (e.g. a cookie) is returned that kfp.Client can use to access the endpoint
  290. - special case: for authenticators that support unsecured endpoints, None must be returned
  291. :param kf_endpoint: Kubeflow endpoint URL
  292. :type kf_endpoint: str
  293. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified.
  294. :type runtime_config_name: str
  295. :raises NotImplementedError: This method needs to be implemented.
  296. :raises AuthenticationError: Authentication failed. Details are in the exception.
  297. :return: an entity that provides the Kubeflow Pipelines SDK client access to the specified endpoint
  298. :rtype: Optional[str]
  299. """
  300. raise NotImplementedError("Method AbstractAuthenticator.authenticate must be implemented.")
  301. class NoAuthenticationAuthenticator(AbstractAuthenticator):
  302. """
  303. Authenticator for Kubeflow servers that are not secured.
  304. """
  305. _type = SupportedAuthProviders.NO_AUTHENTICATION
  306. def authenticate(self, kf_endpoint: str, runtime_config_name: str) -> Optional[SupportedAuthProviders]:
  307. """
  308. Confirms that the specified kf_endpoint can be accessed
  309. without authentication.
  310. :param kf_endpoint: Kubeflow API endpoint to verify
  311. :type kf_endpoint: str
  312. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified
  313. :type runtime_config_name: str
  314. :raises AuthenticationError: the endpoint is secured or an error occurred during processing.
  315. :return: None if the endpoint is unsecured
  316. :rtype: Optional[str]
  317. """
  318. # verify that the endpoint is unsecured
  319. get_response = requests.get(kf_endpoint, allow_redirects=True)
  320. if len(get_response.history) > 0:
  321. raise AuthenticationError(
  322. f"Authentication is required for Kubeflow at {kf_endpoint}. "
  323. f"Update the authentication type setting in runtime configuration "
  324. f"'{runtime_config_name}' and try again.",
  325. provider=self._type,
  326. )
  327. return None
  328. class DEXStaticPasswordAuthenticator(AbstractAuthenticator):
  329. """
  330. Authenticator for DEX/static passwords
  331. """
  332. _type = SupportedAuthProviders.DEX_STATIC_PASSWORDS
  333. def authenticate(
  334. self, kf_endpoint: str, runtime_config_name: str, username: str = None, password: str = None
  335. ) -> Optional[str]:
  336. """
  337. Authenticate using static password authentication. An AuthenticationError is raised
  338. if (1) kf_endpoint is unsecured (2) kf_endpoint does not
  339. support static password authentication (3) the credentials are invalid
  340. :param kf_endpoint: Kubeflow API endpoint to verify
  341. :type kf_endpoint: str
  342. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified
  343. :type runtime_config_name: str
  344. :param username: Id to be used for authentication
  345. :type username: str
  346. :param password: Password to be used for authentication
  347. :type password: str
  348. :raises AuthenticationError: Authentication failed due to the specified error.
  349. :return: A cookie value
  350. """
  351. # This code can be removed after the kfp runtime schema enforces that the values
  352. # for username and password are valid
  353. if _empty_or_whitespaces_only(username) or _empty_or_whitespaces_only(password):
  354. raise AuthenticationError(
  355. f"Credentials are required to perform this type of authentication. "
  356. f"Update runtime configuration '{runtime_config_name}' and try again.",
  357. provider=self._type,
  358. )
  359. with requests.Session() as s:
  360. request_history = []
  361. ################
  362. # Determine if Endpoint is Secured
  363. ################
  364. resp = s.get(kf_endpoint, allow_redirects=True)
  365. request_history.append((kf_endpoint, resp))
  366. if resp.status_code != HTTPStatus.OK:
  367. raise AuthenticationError(
  368. f"Error detecting whether Kubeflow server at {kf_endpoint} is secured: "
  369. f"HTTP status code {resp.status_code}"
  370. f"Update runtime configuration '{runtime_config_name}' and try again.",
  371. provider=self._type,
  372. request_history=request_history,
  373. )
  374. if len(resp.history) == 0:
  375. # if we were NOT redirected, then the endpoint is UNSECURED
  376. # treat this as an error.
  377. raise AuthenticationError(
  378. f"The Kubeflow server at {kf_endpoint} is not secured "
  379. "using DEX static password. "
  380. f"Update runtime configuration '{runtime_config_name}' and try again.",
  381. provider=self._type,
  382. request_history=request_history,
  383. )
  384. ################
  385. # Get Dex Login URL
  386. ################
  387. redirect_url_obj = urlsplit(resp.url)
  388. # if we are at `/auth?=xxxx` path, we need to select the
  389. # static password auth type
  390. if re.search(r"/auth$", redirect_url_obj.path):
  391. redirect_url_obj = redirect_url_obj._replace(
  392. path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path)
  393. )
  394. else:
  395. # verify that KF is secured by static passwords
  396. m = re.search(r"/auth/([^/]*)/?", redirect_url_obj.path)
  397. if m and m.group(1) != "local":
  398. raise AuthenticationError(
  399. f"The Kubeflow server at {kf_endpoint} redirected to an unexpected HTTP path "
  400. f"('{redirect_url_obj.path}'). Verify that Kubeflow is secured using '{self._type.name}'"
  401. f" and, if necessary, update the authentication type in runtime configuration "
  402. f"'{runtime_config_name}'.",
  403. provider=self._type,
  404. request_history=request_history,
  405. )
  406. # if we are at `/auth/local/login` path, then no further action is needed
  407. # (we can use it for login POST)
  408. if re.search(r"/auth/local/login$", redirect_url_obj.path):
  409. dex_login_url = redirect_url_obj.geturl()
  410. else:
  411. # else, we need to be redirected to the actual login page
  412. # this GET should redirect us to the `/auth/local/login` path
  413. resp = s.get(redirect_url_obj.geturl(), allow_redirects=True)
  414. request_history.append((redirect_url_obj.geturl(), resp))
  415. if resp.status_code != HTTPStatus.OK:
  416. raise AuthenticationError(
  417. "Error redirecting to the DEX static password login page: "
  418. f"HTTP status code {resp.status_code}.",
  419. provider=self._type,
  420. request_history=request_history,
  421. )
  422. # set the login url
  423. dex_login_url = resp.url
  424. ################
  425. # Attempt Dex Login
  426. ################
  427. resp = s.post(dex_login_url, data={"login": username, "password": password}, allow_redirects=True)
  428. request_history.append((dex_login_url, resp))
  429. if len(resp.history) == 0:
  430. raise AuthenticationError(
  431. "The credentials are probably invalid. "
  432. f"Update runtime configuration '{runtime_config_name}' and try again.",
  433. provider=self._type,
  434. request_history=request_history,
  435. )
  436. # store the session cookies in a "key1=value1; key2=value2" string
  437. return "; ".join([f"{c.name}={c.value}" for c in s.cookies])
  438. class DEXLDAPAuthenticator(AbstractAuthenticator):
  439. """
  440. Authenticator for DEX/LDAP.
  441. """
  442. _type = SupportedAuthProviders.DEX_LDAP
  443. def authenticate(
  444. self, kf_endpoint: str, runtime_config_name: str, username: str = None, password: str = None
  445. ) -> Optional[str]:
  446. """
  447. Authenticate using LDAP. An AuthenticationError is raised
  448. if (1) kf_endpoint is unsecured (2) kf_endpoint does not
  449. support LDAP authentication (3) the credentials are invalid
  450. :param kf_endpoint: Kubeflow API endpoint to verify
  451. :type kf_endpoint: str
  452. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified
  453. :type runtime_config_name: str
  454. :param username: Id to be used for authentication
  455. :type username: str
  456. :param password: Password to be used for authentication
  457. :type password: str
  458. :raises AuthenticationError: Authentication failed due to the specified error.
  459. :return: A cookie value
  460. """
  461. # This code can be removed after the kfp runtime schema enforces that the values
  462. # for username and password are valid
  463. if _empty_or_whitespaces_only(username) or _empty_or_whitespaces_only(password):
  464. raise AuthenticationError(
  465. f"Credentials are required to perform this type of authentication. "
  466. f"Update runtime configuration '{runtime_config_name}' and try again.",
  467. provider=self._type,
  468. )
  469. with requests.Session() as s:
  470. request_history = []
  471. ################
  472. # Determine if Endpoint is Secured
  473. ################
  474. resp = s.get(kf_endpoint, allow_redirects=True)
  475. request_history.append((kf_endpoint, resp))
  476. if resp.status_code != HTTPStatus.OK:
  477. raise AuthenticationError(
  478. f"Error detecting whether Kubeflow server at {kf_endpoint} is secured: "
  479. f"HTTP status code {resp.status_code}"
  480. f"Update runtime configuration '{runtime_config_name}' and try again.",
  481. provider=self._type,
  482. request_history=request_history,
  483. )
  484. if len(resp.history) == 0:
  485. # if we were NOT redirected, then the endpoint is UNSECURED
  486. # treat this as an error.
  487. raise AuthenticationError(
  488. f"The Kubeflow server at {kf_endpoint} is not secured using DEX with LDAP. "
  489. f"Update the authentication type in runtime configuration "
  490. f"'{runtime_config_name}' and try again.",
  491. provider=self._type,
  492. request_history=request_history,
  493. )
  494. ################
  495. # Get Dex Login URL
  496. ################
  497. redirect_url_obj = urlsplit(resp.url)
  498. # if we are at `/auth?=xxxx` path, we need to select
  499. # the LDAP auth type
  500. if re.search(r"/auth$", redirect_url_obj.path):
  501. redirect_url_obj = redirect_url_obj._replace(
  502. path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path)
  503. )
  504. else:
  505. # verify that KF is secured by LDAP
  506. m = re.search(r"/auth/([^/]*)/?", redirect_url_obj.path)
  507. if m and m.group(1) != "ldap":
  508. raise AuthenticationError(
  509. f"The Kubeflow server at {kf_endpoint} redirected to an unexpected HTTP path "
  510. f"('{redirect_url_obj.path}'). Verify that Kubeflow is configured for '{self._type.name}'"
  511. f" and, if necessary, update the authentication type in runtime configuration "
  512. f"'{runtime_config_name}'.",
  513. provider=self._type,
  514. request_history=request_history,
  515. )
  516. # if we are at `/auth/ldap/login` path, then no further action is needed
  517. # (we can use it for login POST)
  518. if re.search(r"/auth/ldap/login$", redirect_url_obj.path):
  519. dex_login_url = redirect_url_obj.geturl()
  520. else:
  521. # else, we need to be redirected to the actual login page
  522. # this GET should redirect us to the `/auth/ldap/login` path
  523. resp = s.get(redirect_url_obj.geturl(), allow_redirects=True)
  524. request_history.append((redirect_url_obj.geturl(), resp))
  525. if resp.status_code != HTTPStatus.OK:
  526. raise AuthenticationError(
  527. "Error redirecting to the DEX LDAP login page: " f"HTTP status code {resp.status_code}.",
  528. provider=self._type,
  529. request_history=request_history,
  530. )
  531. # set the login url
  532. dex_login_url = resp.url
  533. ################
  534. # Attempt Dex Login
  535. ################
  536. resp = s.post(dex_login_url, data={"login": username, "password": password}, allow_redirects=True)
  537. request_history.append((dex_login_url, resp))
  538. if len(resp.history) == 0:
  539. raise AuthenticationError(
  540. "The DEX LDAP credentials are probably invalid. "
  541. f"Update runtime configuration '{runtime_config_name}' and try again.",
  542. provider=self._type,
  543. request_history=request_history,
  544. )
  545. # store the session cookies in a "key1=value1; key2=value2" string
  546. return "; ".join([f"{c.name}={c.value}" for c in s.cookies])
  547. class K8sServiceAccountTokenAuthenticator(AbstractAuthenticator):
  548. """
  549. Authenticator for Service Account Tokens on Kubernetes.
  550. """
  551. _type = SupportedAuthProviders.KUBERNETES_SERVICE_ACCOUNT_TOKEN
  552. def authenticate(self, kf_endpoint: str, runtime_config_name: str) -> ServiceAccountTokenVolumeCredentials:
  553. """
  554. Verify that service account token authentication can be performed.
  555. An AuthenticationError is raised if a problem is encountered that
  556. would likely prevent the KFP client from authenticating successfully.
  557. :param kf_endpoint: Kubeflow API endpoint to verify
  558. :type kf_endpoint: str
  559. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified
  560. :type runtime_config_name: str
  561. :raises AuthenticationError: A potential issue was detected that will
  562. likely cause a KFP client failure.
  563. :return: ServiceAccountTokenVolumeCredentials
  564. """
  565. request_history = []
  566. """
  567. Disable connectivity test to avoid false positives.
  568. # Verify connectivity for the API endpoint
  569. resp = requests.get(kf_endpoint, allow_redirects=True)
  570. request_history.append((kf_endpoint, resp))
  571. if resp.status_code != HTTPStatus.OK:
  572. raise AuthenticationError(f'Error detecting whether Kubeflow server at {kf_endpoint} is secured: '
  573. 'HTTP status code {resp.status_code}'
  574. f'Update runtime configuration \'{runtime_config_name}\' and try again.',
  575. provider=self._type,
  576. request_history=request_history)
  577. # If redirected, KF cannot be accessed using service account token.
  578. # This is a likely mismatch between selected Kubeflow auth type and configured auth type.
  579. if len(resp.history) > 0:
  580. raise AuthenticationError(f'Kubeflow server at {kf_endpoint} redirected to an unexpected '
  581. f'URL \'{resp.url}\'. Service account token access cannot be used '
  582. 'for authentication. '
  583. f'Update runtime configuration \'{runtime_config_name}\' and try again.',
  584. provider=self._type,
  585. request_history=request_history)
  586. """
  587. # Running in a Kubernetes pod, kfp.Client can use a service account token
  588. # for authentication. Verify that a token file exists in the current environment.
  589. service_account_token_path = os.environ.get(KF_PIPELINES_SA_TOKEN_ENV, KF_PIPELINES_SA_TOKEN_PATH)
  590. try:
  591. with open(service_account_token_path, "r") as token_file:
  592. if len(token_file.read()) == 0:
  593. raise AuthenticationError(
  594. f"Kubernetes service account token file " f"{service_account_token_path} is empty.",
  595. provider=self._type,
  596. request_history=request_history,
  597. )
  598. except AuthenticationError:
  599. raise
  600. except Exception as ex:
  601. raise AuthenticationError(
  602. f"Kubernetes service account token could not be read " f"from {service_account_token_path}: {ex}.",
  603. provider=self._type,
  604. request_history=request_history,
  605. )
  606. # return a ServiceAccountTokenVolumeCredentials to be passed as the "credentials"
  607. # argument of a `kfp.Client()` constructor
  608. return ServiceAccountTokenVolumeCredentials(path=service_account_token_path)
  609. class DEXLegacyAuthenticator(AbstractAuthenticator):
  610. """
  611. Authenticator for generic/legacy DEX authentication.
  612. """
  613. _type = SupportedAuthProviders.DEX_LEGACY
  614. def authenticate(
  615. self, kf_endpoint: str, runtime_config_name: str, username: Optional[str] = None, password: Optional[str] = None
  616. ) -> Optional[str]:
  617. """
  618. Authentication using the following flow:
  619. - detect whether Kubeflow endpoint is secured
  620. - if endpoint is secured, try to authenticate if a username and password were provided
  621. :param kf_endpoint: Kubeflow API endpoint to verify
  622. :type kf_endpoint: str
  623. :param runtime_config_name: Runtime configuration name where kf_endpoint is specified
  624. :type runtime_config_name: str
  625. :param username: Id to be used for authentication
  626. :type username: Optional[str]
  627. :param password: Password to be used for authentication
  628. :type password: Optional[str]
  629. :raises AuthenticationError: Authentication failed due to the specified error.
  630. :return: None if kf_endpoint is not secured, a cookie value otherwise
  631. :rtype: Optional[str]
  632. """
  633. # keep history of all HTTP requests and responses for troubleshooting
  634. request_history = []
  635. # Obtain redirect URL
  636. resp = requests.get(kf_endpoint)
  637. request_history.append((kf_endpoint, resp))
  638. if resp.status_code != HTTPStatus.OK:
  639. raise AuthenticationError(
  640. f"Error detecting whether Kubeflow server at {kf_endpoint} is secured: "
  641. "HTTP status code {resp.status_code}"
  642. f"Update runtime configuration '{runtime_config_name}' and try again.",
  643. provider=self._type,
  644. request_history=request_history,
  645. )
  646. # If KF redirected to '/dex/auth/...
  647. # try to authenticate using the provided credentials
  648. if "dex/auth" in resp.url:
  649. if _empty_or_whitespaces_only(username) or _empty_or_whitespaces_only(password):
  650. raise AuthenticationError(
  651. f"Kubeflow server at {kf_endpoint} is secured: "
  652. "username and password are required. "
  653. f"Update runtime configuration '{runtime_config_name}' and try again.",
  654. provider=self._type,
  655. request_history=request_history,
  656. )
  657. # Try to authenticate user by sending a request to the
  658. # redirect URL
  659. session = requests.Session()
  660. auth_url = resp.url
  661. resp = session.post(auth_url, data={"login": username, "password": password})
  662. request_history.append((auth_url, resp))
  663. if resp.status_code != HTTPStatus.OK:
  664. raise AuthenticationError(
  665. f"Authentication {auth_url} failed: "
  666. f"HTTP status code {resp.status_code}"
  667. f"Update runtime configuration '{runtime_config_name}' and try again.",
  668. provider=self._type,
  669. request_history=request_history,
  670. )
  671. # Capture authservice_session cookie, if one was returned
  672. # in the response
  673. cookie_auth_key = "authservice_session"
  674. cookie_auth_value = session.cookies.get(cookie_auth_key)
  675. if cookie_auth_value:
  676. return f"{cookie_auth_key}={cookie_auth_value}"
  677. # The endpoint is not secured.
  678. return None