livy-shell 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python
  2. #
  3. # Licensed to the Apache Software Foundation (ASF) under one or more
  4. # contributor license agreements. See the NOTICE file distributed with
  5. # this work for additional information regarding copyright ownership.
  6. # The ASF licenses this file to You under the Apache License, Version 2.0
  7. # (the "License"); you may not use this file except in compliance with
  8. # the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. #
  18. # Very bare bones shell for driving a Livy session. Usage:
  19. #
  20. # livy-shell url [option=value ...]
  21. #
  22. # Options are set directly in the session creation request, so they must match the names of fields
  23. # in the CreateInteractiveRequest structure. Option values should be python-like objects (should be
  24. # parseable by python's "eval" function; naked strings are allowed). For example:
  25. #
  26. # kind=spark
  27. # jars=[ '/foo.jar', '/bar.jar' ]
  28. # conf={ foo : bar, 'spark.option' : opt_value }
  29. #
  30. # By default, a Spark (Scala) session is created.
  31. #
  32. import json
  33. import readline
  34. import signal
  35. import sys
  36. import time
  37. import urlparse
  38. class ControlCInterrupt(Exception):
  39. pass
  40. def check(condition, msg, *args):
  41. if not condition:
  42. if args:
  43. msg = msg % args
  44. print >> sys.stderr, msg
  45. sys.exit(1)
  46. def message(msg, *args):
  47. if args:
  48. msg = msg % args
  49. print(msg)
  50. try:
  51. import requests
  52. except ImportError:
  53. message("Unable to import 'requests' module, which is required by livy-shell.")
  54. sys.exit(1)
  55. class LiteralDict(dict):
  56. def __missing__(self, name):
  57. return name
  58. def request(method, uri, body):
  59. kwargs = { 'headers': { 'Content-Type' : 'application/json', 'X-Requested-By': 'livy' } }
  60. if body:
  61. kwargs['json'] = body
  62. resp = requests.request(method.upper(), urlparse.urljoin(url.geturl(), uri), **kwargs)
  63. resp.raise_for_status()
  64. if resp.status_code < requests.codes.multiple_choices and resp.status_code != requests.codes.no_content:
  65. return resp.json()
  66. return None
  67. def get(uri):
  68. return request('GET', uri, None)
  69. def post(uri, body):
  70. return request('POST', uri, body)
  71. def delete(uri):
  72. return request('DELETE', uri, None)
  73. def create_session():
  74. request = {
  75. "kind" : "spark"
  76. }
  77. for opt in sys.argv[2:]:
  78. check(opt.find('=') > 0, "Invalid option: %s.", opt)
  79. key, value = opt.split('=', 1)
  80. request[key] = eval(value, LiteralDict())
  81. return post("/sessions", request)
  82. def wait_for_idle(sid):
  83. session = get("/sessions/%d" % (sid, ))
  84. while session['state'] == 'starting':
  85. message("Session not ready yet (%s)", session['state'])
  86. time.sleep(5)
  87. session = get("/sessions/%d" % (sid, ))
  88. if session['state'] != 'idle':
  89. raise Exception, "Session failed to start."
  90. def monitor_statement(sid, s):
  91. cnt = 0
  92. while True:
  93. state = s['state']
  94. if state == 'available':
  95. output = s['output']
  96. status = output['status']
  97. if status == 'ok':
  98. result = output['data']
  99. text = result.get('text/plain', None)
  100. if text is None:
  101. message("Success (non-text result).")
  102. elif text.rstrip():
  103. message("%s", text)
  104. elif status == 'error':
  105. ename = output['ename']
  106. evalue = output['evalue']
  107. traceback = "\n".join(output.get('traceback', []))
  108. message("%s: %s", ename, evalue)
  109. if traceback:
  110. message("%s", traceback)
  111. else:
  112. message("Statement finished with unknown status '%s'.", status)
  113. break
  114. elif state == 'error':
  115. message("%s", s['error'])
  116. break
  117. else:
  118. if cnt == 10:
  119. message("(waiting for result...)")
  120. cnt = 0
  121. else:
  122. cnt += 1
  123. time.sleep(1)
  124. s = get("/sessions/%d/statements/%s" % (sid, s['id']))
  125. def run_shell(sid, session_kind):
  126. prompt = "{} ({}) > ".format(session_kind, sid)
  127. def ctrl_c_handler(signal, frame):
  128. message("\nPlease type quit() to exit the livy shell.")
  129. raise ControlCInterrupt()
  130. signal.signal(signal.SIGINT, ctrl_c_handler)
  131. while True:
  132. try:
  133. cmd = raw_input(prompt)
  134. if cmd == "quit()":
  135. break
  136. except ControlCInterrupt:
  137. continue
  138. statement = post("/sessions/%d/statements" % (sid, ), { 'code' : cmd })
  139. monitor_statement(sid, statement)
  140. #
  141. # main()
  142. #
  143. check(len(sys.argv) > 1, "Missing arguments.")
  144. url = urlparse.urlparse(sys.argv[1])
  145. sid = -1
  146. try:
  147. message("Creating new session...")
  148. session = create_session()
  149. sid = int(session['id'])
  150. message("New session (id = %d, kind = %s), waiting for idle state...", sid, session['kind'])
  151. wait_for_idle(sid)
  152. message("Session ready.")
  153. run_shell(sid, session.get('kind', 'spark'))
  154. except EOFError:
  155. pass
  156. finally:
  157. if sid != -1:
  158. delete("/sessions/%d" % (sid, ))