process.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # coding: utf-8
  2. """JupyterLab command handler"""
  3. # Copyright (c) Jupyter Development Team.
  4. # Distributed under the terms of the Modified BSD License.
  5. from __future__ import print_function
  6. import atexit
  7. import logging
  8. import os
  9. import re
  10. import signal
  11. import sys
  12. import threading
  13. import time
  14. import weakref
  15. from tornado import gen
  16. from .jlpmapp import which, subprocess
  17. try:
  18. import pty
  19. except ImportError:
  20. pty = False
  21. if sys.platform == 'win32':
  22. list2cmdline = subprocess.list2cmdline
  23. else:
  24. def list2cmdline(cmd_list):
  25. import pipes
  26. return ' '.join(map(pipes.quote, cmd_list))
  27. logging.basicConfig(format='%(message)s', level=logging.INFO)
  28. class Process(object):
  29. """A wrapper for a child process.
  30. """
  31. _procs = weakref.WeakSet()
  32. _pool = None
  33. def __init__(self, cmd, logger=None, cwd=None, kill_event=None,
  34. env=None, quiet=False):
  35. """Start a subprocess that can be run asynchronously.
  36. Parameters
  37. ----------
  38. cmd: list
  39. The command to run.
  40. logger: :class:`~logger.Logger`, optional
  41. The logger instance.
  42. cwd: string, optional
  43. The cwd of the process.
  44. env: dict, optional
  45. The environment for the process.
  46. kill_event: :class:`~threading.Event`, optional
  47. An event used to kill the process operation.
  48. """
  49. if not isinstance(cmd, (list, tuple)):
  50. raise ValueError('Command must be given as a list')
  51. if kill_event and kill_event.is_set():
  52. raise ValueError('Process aborted')
  53. self.logger = logger = logger or logging.getLogger('jupyterlab')
  54. self._last_line = ''
  55. if not quiet:
  56. self.logger.info('> ' + list2cmdline(cmd))
  57. self.cmd = cmd
  58. self.proc = self._create_process(cwd=cwd, env=env)
  59. self._kill_event = kill_event or threading.Event()
  60. Process._procs.add(self)
  61. def terminate(self):
  62. """Terminate the process and return the exit code.
  63. """
  64. proc = self.proc
  65. # Kill the process.
  66. if proc.poll() is None:
  67. os.kill(proc.pid, signal.SIGTERM)
  68. # Wait for the process to close.
  69. try:
  70. proc.wait()
  71. finally:
  72. Process._procs.remove(self)
  73. return proc.returncode
  74. def wait(self):
  75. """Wait for the process to finish.
  76. Returns
  77. -------
  78. The process exit code.
  79. """
  80. proc = self.proc
  81. kill_event = self._kill_event
  82. while proc.poll() is None:
  83. if kill_event.is_set():
  84. self.terminate()
  85. raise ValueError('Process was aborted')
  86. time.sleep(1.)
  87. return self.terminate()
  88. @gen.coroutine
  89. def wait_async(self):
  90. """Asynchronously wait for the process to finish.
  91. """
  92. proc = self.proc
  93. kill_event = self._kill_event
  94. while proc.poll() is None:
  95. if kill_event.is_set():
  96. self.terminate()
  97. raise ValueError('Process was aborted')
  98. yield gen.sleep(1.)
  99. raise gen.Return(self.terminate())
  100. def _create_process(self, **kwargs):
  101. """Create the process.
  102. """
  103. cmd = self.cmd
  104. kwargs.setdefault('stderr', subprocess.STDOUT)
  105. cmd[0] = which(cmd[0], kwargs.get('env'))
  106. if os.name == 'nt':
  107. kwargs['shell'] = True
  108. proc = subprocess.Popen(cmd, **kwargs)
  109. return proc
  110. @classmethod
  111. def _cleanup(cls):
  112. """Clean up the started subprocesses at exit.
  113. """
  114. for proc in list(cls._procs):
  115. proc.terminate()
  116. class WatchHelper(Process):
  117. """A process helper for a watch process.
  118. """
  119. def __init__(self, cmd, startup_regex, logger=None, cwd=None,
  120. kill_event=None, env=None):
  121. """Initialize the process helper.
  122. Parameters
  123. ----------
  124. cmd: list
  125. The command to run.
  126. startup_regex: string
  127. The regex to wait for at startup.
  128. logger: :class:`~logger.Logger`, optional
  129. The logger instance.
  130. cwd: string, optional
  131. The cwd of the process.
  132. env: dict, optional
  133. The environment for the process.
  134. kill_event: callable, optional
  135. A function to call to check if we should abort.
  136. """
  137. super(WatchHelper, self).__init__(cmd, logger=logger,
  138. cwd=cwd, kill_event=kill_event, env=env)
  139. if not pty:
  140. self._stdout = self.proc.stdout
  141. while 1:
  142. line = self._stdout.readline().decode('utf-8')
  143. if not line:
  144. raise RuntimeError('Process ended improperly')
  145. print(line.rstrip())
  146. if re.match(startup_regex, line):
  147. break
  148. self._read_thread = threading.Thread(target=self._read_incoming)
  149. self._read_thread.setDaemon(True)
  150. self._read_thread.start()
  151. def terminate(self):
  152. """Terminate the process.
  153. """
  154. proc = self.proc
  155. if proc.poll() is None:
  156. if os.name != 'nt':
  157. # Kill the process group if we started a new session.
  158. os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
  159. else:
  160. os.kill(proc.pid, signal.SIGTERM)
  161. # Close stdout.
  162. try:
  163. self._stdout.close()
  164. except Exception as e:
  165. pass
  166. # Wait for the process to close.
  167. try:
  168. proc.wait()
  169. finally:
  170. Process._procs.remove(self)
  171. return proc.returncode
  172. def _read_incoming(self):
  173. """Run in a thread to read stdout and print"""
  174. fileno = self._stdout.fileno()
  175. while 1:
  176. try:
  177. buf = os.read(fileno, 1024)
  178. except OSError as e:
  179. self.logger.debug('Read incoming error %s', e)
  180. return
  181. if not buf:
  182. return
  183. print(buf.decode('utf-8'), end='')
  184. def _create_process(self, **kwargs):
  185. """Create the watcher helper process.
  186. """
  187. kwargs['bufsize'] = 0
  188. if pty:
  189. master, slave = pty.openpty()
  190. kwargs['stderr'] = kwargs['stdout'] = slave
  191. kwargs['start_new_session'] = True
  192. self._stdout = os.fdopen(master, 'rb')
  193. else:
  194. kwargs['stdout'] = subprocess.PIPE
  195. if os.name == 'nt':
  196. startupinfo = subprocess.STARTUPINFO()
  197. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
  198. kwargs['startupinfo'] = startupinfo
  199. kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
  200. kwargs['shell'] = True
  201. return super(WatchHelper, self)._create_process(**kwargs)
  202. # Register the cleanup handler.
  203. atexit.register(Process._cleanup)