|
@@ -28,6 +28,7 @@ from configparser import ConfigParser
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
from future.moves.urllib.parse import ParseResult, urlparse
|
|
|
from io import open, StringIO
|
|
|
+from requests_kerberos import HTTPKerberosAuth, REQUIRED
|
|
|
from livy.job_handle import JobHandle
|
|
|
|
|
|
|
|
@@ -76,13 +77,13 @@ class HttpClient(object):
|
|
|
path=match.group(1), params=uri.params, query=uri.query,
|
|
|
fragment=uri.fragment)
|
|
|
self._set_uri(base)
|
|
|
- self._conn = _LivyConnection(base)
|
|
|
+ self._conn = _LivyConnection(base, self._config)
|
|
|
self._session_id = int(match.group(2))
|
|
|
self._reconnect_to_existing_session()
|
|
|
else:
|
|
|
self._set_uri(uri)
|
|
|
session_conf_dict = dict(self._config.items(self._CONFIG_SECTION))
|
|
|
- self._conn = _LivyConnection(uri)
|
|
|
+ self._conn = _LivyConnection(uri, self._config)
|
|
|
self._session_id = self._create_new_session(
|
|
|
session_conf_dict).json()['id']
|
|
|
self._executor = ThreadPoolExecutor(max_workers=1)
|
|
@@ -434,10 +435,22 @@ class _LivyConnection(object):
|
|
|
'Accept': 'application/json',
|
|
|
}
|
|
|
|
|
|
- def __init__(self, uri):
|
|
|
+ _SPNEGO_ENABLED_CONF = 'livy.client.http.spnego.enable'
|
|
|
+
|
|
|
+ def __init__(self, uri, config):
|
|
|
self._server_url_prefix = uri.geturl() + self._SESSIONS_URI
|
|
|
self._requests = requests
|
|
|
self.lock = threading.Lock()
|
|
|
+ self._spnego_enabled = \
|
|
|
+ config.getboolean('env', self._SPNEGO_ENABLED_CONF) \
|
|
|
+ if config.has_option('env', self._SPNEGO_ENABLED_CONF) else False
|
|
|
+
|
|
|
+ def _spnego_auth(self):
|
|
|
+ if self._spnego_enabled:
|
|
|
+ return HTTPKerberosAuth(mutual_authentication=REQUIRED,
|
|
|
+ sanitize_mutual_error_response=False)
|
|
|
+ else:
|
|
|
+ return None
|
|
|
|
|
|
def send_request(
|
|
|
self,
|
|
@@ -483,7 +496,7 @@ class _LivyConnection(object):
|
|
|
request_url = self._server_url_prefix + suffix_url
|
|
|
return self._requests.request(method, request_url,
|
|
|
timeout=self._TIMEOUT, headers=local_headers, files=files,
|
|
|
- json=data)
|
|
|
+ json=data, auth=self._spnego_auth())
|
|
|
finally:
|
|
|
if files is not None:
|
|
|
files.clear()
|