|
@@ -30,13 +30,18 @@
|
|
|
# By default, a Spark (Scala) session is created.
|
|
|
#
|
|
|
|
|
|
-import httplib
|
|
|
import json
|
|
|
import readline
|
|
|
+import signal
|
|
|
import sys
|
|
|
import time
|
|
|
import urlparse
|
|
|
|
|
|
+
|
|
|
+class ControlCInterrupt(Exception):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
def check(condition, msg, *args):
|
|
|
if not condition:
|
|
|
if args:
|
|
@@ -48,7 +53,14 @@ def check(condition, msg, *args):
|
|
|
def message(msg, *args):
|
|
|
if args:
|
|
|
msg = msg % args
|
|
|
- print msg
|
|
|
+ print(msg)
|
|
|
+
|
|
|
+
|
|
|
+try:
|
|
|
+ import requests
|
|
|
+except ImportError:
|
|
|
+ message("Unable to import 'requests' module, which is required by livy-shell.")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
|
|
|
class LiteralDict(dict):
|
|
@@ -56,33 +68,30 @@ class LiteralDict(dict):
|
|
|
return name
|
|
|
|
|
|
|
|
|
-def request(conn, method, uri, body):
|
|
|
- body = json.dumps(body) if body else None
|
|
|
- headers = { 'Content-Type' : 'application/json' }
|
|
|
- conn.request(method, uri, body=body, headers=headers)
|
|
|
-
|
|
|
- resp = conn.getresponse()
|
|
|
- data = resp.read()
|
|
|
- if resp.status < 200 or resp.status >= 400:
|
|
|
- raise httplib.HTTPException, (resp.status, resp.reason, data)
|
|
|
- if resp.status < 300 and resp.status != httplib.NO_CONTENT:
|
|
|
- return json.loads(data)
|
|
|
+def request(method, uri, body):
|
|
|
+ kwargs = { 'headers': { 'Content-Type' : 'application/json', 'X-Requested-By': 'livy' } }
|
|
|
+ if body:
|
|
|
+ kwargs['json'] = body
|
|
|
+ resp = requests.request(method.upper(), urlparse.urljoin(url.geturl(), uri), **kwargs)
|
|
|
+ resp.raise_for_status()
|
|
|
+ if resp.status_code < requests.codes.multiple_choices and resp.status_code != requests.codes.no_content:
|
|
|
+ return resp.json()
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def get(conn, uri):
|
|
|
- return request(conn, 'GET', uri, None)
|
|
|
+def get(uri):
|
|
|
+ return request('GET', uri, None)
|
|
|
|
|
|
|
|
|
-def post(conn, uri, body):
|
|
|
- return request(conn, 'POST', uri, body)
|
|
|
+def post(uri, body):
|
|
|
+ return request('POST', uri, body)
|
|
|
|
|
|
|
|
|
-def delete(conn, uri):
|
|
|
- return request(conn, 'DELETE', uri, None)
|
|
|
+def delete(uri):
|
|
|
+ return request('DELETE', uri, None)
|
|
|
|
|
|
|
|
|
-def create_session(conn):
|
|
|
+def create_session():
|
|
|
request = {
|
|
|
"kind" : "spark"
|
|
|
}
|
|
@@ -91,21 +100,21 @@ def create_session(conn):
|
|
|
key, value = opt.split('=', 1)
|
|
|
request[key] = eval(value, LiteralDict())
|
|
|
|
|
|
- return post(conn, "/sessions", request)
|
|
|
+ return post("/sessions", request)
|
|
|
|
|
|
|
|
|
def wait_for_idle(sid):
|
|
|
- session = get(conn, "/sessions/%d" % (sid, ))
|
|
|
+ session = get("/sessions/%d" % (sid, ))
|
|
|
while session['state'] == 'starting':
|
|
|
message("Session not ready yet (%s)", session['state'])
|
|
|
time.sleep(5)
|
|
|
- session = get(conn, "/sessions/%d" % (sid, ))
|
|
|
+ session = get("/sessions/%d" % (sid, ))
|
|
|
|
|
|
if session['state'] != 'idle':
|
|
|
raise Exception, "Session failed to start."
|
|
|
|
|
|
|
|
|
-def monitor_statement(conn, sid, s):
|
|
|
+def monitor_statement(sid, s):
|
|
|
cnt = 0
|
|
|
while True:
|
|
|
state = s['state']
|
|
@@ -139,23 +148,27 @@ def monitor_statement(conn, sid, s):
|
|
|
else:
|
|
|
cnt += 1
|
|
|
time.sleep(1)
|
|
|
- s = get(conn, "/sessions/%d/statements/%s" % (sid, s['id']))
|
|
|
+ s = get("/sessions/%d/statements/%s" % (sid, s['id']))
|
|
|
+
|
|
|
|
|
|
+def run_shell(sid, session_kind):
|
|
|
+ prompt = "{} ({}) > ".format(session_kind, sid)
|
|
|
+ def ctrl_c_handler(signal, frame):
|
|
|
+ message("\nPlease type quit() to exit the livy shell.")
|
|
|
+ raise ControlCInterrupt()
|
|
|
+ signal.signal(signal.SIGINT, ctrl_c_handler)
|
|
|
|
|
|
-def run_shell(conn, sid):
|
|
|
while True:
|
|
|
- cmd = raw_input('> ')
|
|
|
- if cmd == "quit()":
|
|
|
- break
|
|
|
+ try:
|
|
|
+ cmd = raw_input(prompt)
|
|
|
+ if cmd == "quit()":
|
|
|
+ break
|
|
|
+ except ControlCInterrupt:
|
|
|
+ continue
|
|
|
|
|
|
- statement = post(conn, "/sessions/%d/statements" % (sid, ), { 'code' : cmd })
|
|
|
- monitor_statement(conn, sid, statement)
|
|
|
+ statement = post("/sessions/%d/statements" % (sid, ), { 'code' : cmd })
|
|
|
+ monitor_statement(sid, statement)
|
|
|
|
|
|
-def open_connection(url):
|
|
|
- if url.scheme == "https":
|
|
|
- return httplib.HTTPSConnection(url.netloc)
|
|
|
- else:
|
|
|
- return httplib.HTTPConnection(url.netloc)
|
|
|
|
|
|
#
|
|
|
# main()
|
|
@@ -164,24 +177,18 @@ def open_connection(url):
|
|
|
check(len(sys.argv) > 1, "Missing arguments.")
|
|
|
|
|
|
url = urlparse.urlparse(sys.argv[1])
|
|
|
-conn = open_connection(url)
|
|
|
-
|
|
|
sid = -1
|
|
|
+
|
|
|
try:
|
|
|
message("Creating new session...")
|
|
|
- session = create_session(conn)
|
|
|
+ session = create_session()
|
|
|
sid = int(session['id'])
|
|
|
message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind'])
|
|
|
wait_for_idle(sid)
|
|
|
message("Session ready.")
|
|
|
- run_shell(conn,sid)
|
|
|
+ run_shell(sid, session.get('kind', 'spark'))
|
|
|
except EOFError:
|
|
|
pass
|
|
|
finally:
|
|
|
- conn.close()
|
|
|
if sid != -1:
|
|
|
- conn = open_connection(url)
|
|
|
- try:
|
|
|
- delete(conn, "/sessions/%d" % (sid, ))
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
+ delete("/sessions/%d" % (sid, ))
|