123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python
- #
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # Very bare bones shell for driving a Livy session. Usage:
- #
- # livy-shell url [option=value ...]
- #
- # Options are set directly in the session creation request, so they must match the names of fields
- # in the CreateInteractiveRequest structure. Option values should be python-like objects (should be
- # parseable by python's "eval" function; naked strings are allowed). For example:
- #
- # kind=spark
- # jars=[ '/foo.jar', '/bar.jar' ]
- # conf={ foo : bar, 'spark.option' : opt_value }
- #
- # By default, a Spark (Scala) session is created.
- #
- import json
- import readline
- import signal
- import sys
- import time
- import urlparse
- class ControlCInterrupt(Exception):
- pass
- def check(condition, msg, *args):
- if not condition:
- if args:
- msg = msg % args
- print >> sys.stderr, msg
- sys.exit(1)
- def message(msg, *args):
- if args:
- msg = msg % args
- print(msg)
- try:
- import requests
- except ImportError:
- message("Unable to import 'requests' module, which is required by livy-shell.")
- sys.exit(1)
- class LiteralDict(dict):
- def __missing__(self, name):
- return name
- def request(method, uri, body):
- kwargs = { 'headers': { 'Content-Type' : 'application/json', 'X-Requested-By': 'livy' } }
- if body:
- kwargs['json'] = body
- resp = requests.request(method.upper(), urlparse.urljoin(url.geturl(), uri), **kwargs)
- resp.raise_for_status()
- if resp.status_code < requests.codes.multiple_choices and resp.status_code != requests.codes.no_content:
- return resp.json()
- return None
- def get(uri):
- return request('GET', uri, None)
- def post(uri, body):
- return request('POST', uri, body)
- def delete(uri):
- return request('DELETE', uri, None)
- def create_session():
- request = {
- "kind" : "spark"
- }
- for opt in sys.argv[2:]:
- check(opt.find('=') > 0, "Invalid option: %s.", opt)
- key, value = opt.split('=', 1)
- request[key] = eval(value, LiteralDict())
- return post("/sessions", request)
- def wait_for_idle(sid):
- session = get("/sessions/%d" % (sid, ))
- while session['state'] == 'starting':
- message("Session not ready yet (%s)", session['state'])
- time.sleep(5)
- session = get("/sessions/%d" % (sid, ))
- if session['state'] != 'idle':
- raise Exception, "Session failed to start."
- def monitor_statement(sid, s):
- cnt = 0
- while True:
- state = s['state']
- if state == 'available':
- output = s['output']
- status = output['status']
- if status == 'ok':
- result = output['data']
- text = result.get('text/plain', None)
- if text is None:
- message("Success (non-text result).")
- elif text.rstrip():
- message("%s", text)
- elif status == 'error':
- ename = output['ename']
- evalue = output['evalue']
- traceback = "\n".join(output.get('traceback', []))
- message("%s: %s", ename, evalue)
- if traceback:
- message("%s", traceback)
- else:
- message("Statement finished with unknown status '%s'.", status)
- break
- elif state == 'error':
- message("%s", s['error'])
- break
- else:
- if cnt == 10:
- message("(waiting for result...)")
- cnt = 0
- else:
- cnt += 1
- time.sleep(1)
- s = get("/sessions/%d/statements/%s" % (sid, s['id']))
- def run_shell(sid, session_kind):
- prompt = "{} ({}) > ".format(session_kind, sid)
- def ctrl_c_handler(signal, frame):
- message("\nPlease type quit() to exit the livy shell.")
- raise ControlCInterrupt()
- signal.signal(signal.SIGINT, ctrl_c_handler)
- while True:
- try:
- cmd = raw_input(prompt)
- if cmd == "quit()":
- break
- except ControlCInterrupt:
- continue
- statement = post("/sessions/%d/statements" % (sid, ), { 'code' : cmd })
- monitor_statement(sid, statement)
- #
- # main()
- #
- check(len(sys.argv) > 1, "Missing arguments.")
- url = urlparse.urlparse(sys.argv[1])
- sid = -1
- try:
- message("Creating new session...")
- session = create_session()
- sid = int(session['id'])
- message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind'])
- wait_for_idle(sid)
- message("Session ready.")
- run_shell(sid, session.get('kind', 'spark'))
- except EOFError:
- pass
- finally:
- if sid != -1:
- delete("/sessions/%d" % (sid, ))
|