浏览代码

First pass at making all kernel message handling asynchronous and preserving message order.

Fixes #4188
Jason Grout 7 年之前
父节点
当前提交
6faa2dbba2

+ 6 - 6
packages/services/src/kernel/comm.ts

@@ -56,7 +56,7 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm {
    *
    * **See also:** [[ICommClose]], [[close]]
    */
-  get onClose(): (msg: KernelMessage.ICommCloseMsg) => void {
+  get onClose(): (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void {
     return this._onClose;
   }
 
@@ -69,21 +69,21 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm {
    *
    * **See also:** [[close]]
    */
-  set onClose(cb: (msg: KernelMessage.ICommCloseMsg) => void) {
+  set onClose(cb: (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void) {
     this._onClose = cb;
   }
 
   /**
    * Get the callback for a comm message received event.
    */
-  get onMsg(): (msg: KernelMessage.ICommMsgMsg) => void {
+  get onMsg(): (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void {
     return this._onMsg;
   }
 
   /**
    * Set the callback for a comm message received event.
    */
-  set onMsg(cb: (msg: KernelMessage.ICommMsgMsg) => void) {
+  set onMsg(cb: (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void) {
     this._onMsg = cb;
   }
 
@@ -180,6 +180,6 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm {
   private _target = '';
   private _id = '';
   private _kernel: Kernel.IKernel;
-  private _onClose: (msg: KernelMessage.ICommCloseMsg) => void;
-  private _onMsg: (msg: KernelMessage.ICommMsgMsg) => void;
+  private _onClose: (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void;
+  private _onMsg: (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void;
 }

+ 95 - 101
packages/services/src/kernel/default.ts

@@ -77,7 +77,7 @@ class DefaultKernel implements Kernel.IKernel {
     this._clientId = options.clientId || uuid();
     this._username = options.username || '';
     this._futures = new Map<string, KernelFutureHandler>();
-    this._commPromises = new Map<string, Promise<Kernel.IComm>>();
+    this._comms = new Map<string, Kernel.IComm>();
     this._createSocket();
     Private.runningKernels.push(this);
   }
@@ -103,6 +103,10 @@ class DefaultKernel implements Kernel.IKernel {
 
   /**
    * A signal emitted for iopub kernel messages.
+   *
+   * #### Notes
+   * This signal is emitted after the iopub message is handled, which may
+   * asynchronously after it is received.
    */
   get iopubMessage(): ISignal<this, KernelMessage.IIOPubMessage> {
     return this._iopubMessage;
@@ -235,14 +239,8 @@ class DefaultKernel implements Kernel.IKernel {
     this._terminated.emit(void 0);
     this._status = 'dead';
     this._clearSocket();
-    this._futures.forEach((future, key) => {
-      future.dispose();
-    });
-    this._commPromises.forEach((promise, key) => {
-      promise.then(comm => {
-        comm.dispose();
-      });
-    });
+    this._futures.forEach(future => { future.dispose(); });
+    this._comms.forEach(comm => { comm.dispose(); });
     this._displayIdToParentIds.clear();
     this._msgIdToDisplayIds.clear();
     ArrayExt.removeFirstOf(Private.runningKernels, this);
@@ -627,24 +625,25 @@ class DefaultKernel implements Kernel.IKernel {
   /**
    * Connect to a comm, or create a new one.
    *
+   * TODO: should we change this to a synchronous function? There's nothing
+   * asynchronous about it now.
+   *
    * #### Notes
    * If a client-side comm already exists, it is returned.
    */
-  connectToComm(targetName: string, commId?: string): Promise<Kernel.IComm> {
+  async connectToComm(targetName: string, commId?: string): Promise<Kernel.IComm> {
     let id = commId || uuid();
-    if (this._commPromises.has(id)) {
-      return this._commPromises.get(id);
+    if (this._comms.has(id)) {
+      return this._comms.get(id);
     }
-    let promise = Promise.resolve(void 0).then(() => {
-      return new CommHandler(
-        targetName,
-        id,
-        this,
-        () => { this._unregisterComm(id); }
-      );
-    });
-    this._commPromises.set(id, promise);
-    return promise;
+    let comm = new CommHandler(
+      targetName,
+      id,
+      this,
+      () => { this._unregisterComm(id); }
+    );
+    this._comms.set(id, comm);
+    return comm;
   }
 
   /**
@@ -652,7 +651,7 @@ class DefaultKernel implements Kernel.IKernel {
    *
    * @returns Whether the message was handled.
    */
-  private _handleDisplayId(displayId: string, msg: KernelMessage.IMessage): boolean {
+  private async _handleDisplayId(displayId: string, msg: KernelMessage.IMessage): Promise<boolean> {
     let msgId = (msg.parent_header as KernelMessage.IHeader).msg_id;
     let parentIds = this._displayIdToParentIds.get(displayId);
     if (parentIds) {
@@ -668,12 +667,12 @@ class DefaultKernel implements Kernel.IKernel {
       };
       (updateMsg.header as any).msg_type = 'update_display_data';
 
-      parentIds.map((parentId) => {
+      await Promise.all(parentIds.map(async (parentId) => {
         let future = this._futures && this._futures.get(parentId);
         if (future) {
-          future.handleMsg(updateMsg);
+          await future.handleMsg(updateMsg);
         }
-      });
+      }));
     }
 
     // We're done here if it's update_display.
@@ -697,7 +696,7 @@ class DefaultKernel implements Kernel.IKernel {
     }
     this._msgIdToDisplayIds.set(msgId, displayIds);
 
-    // Let it propagate to the intended recipient.
+    // Let the message propagate to the intended recipient.
     return false;
   }
 
@@ -771,16 +770,10 @@ class DefaultKernel implements Kernel.IKernel {
   private _clearState(): void {
     this._isReady = false;
     this._pendingMessages = [];
-    this._futures.forEach((future, key) => {
-      future.dispose();
-    });
-    this._commPromises.forEach((promise, key) => {
-      promise.then(comm => {
-        comm.dispose();
-      });
-    });
+    this._futures.forEach(future => { future.dispose(); });
+    this._comms.forEach(comm => { comm.dispose(); });
     this._futures = new Map<string, KernelFutureHandler>();
-    this._commPromises = new Map<string, Promise<Kernel.IComm>>();
+    this._comms = new Map<string, Kernel.IComm>();
     this._displayIdToParentIds.clear();
     this._msgIdToDisplayIds.clear();
   }
@@ -788,94 +781,77 @@ class DefaultKernel implements Kernel.IKernel {
   /**
    * Handle a `comm_open` kernel message.
    */
-  private _handleCommOpen(msg: KernelMessage.ICommOpenMsg): void {
+  private async _handleCommOpen(msg: KernelMessage.ICommOpenMsg): Promise<void> {
     let content = msg.content;
     if (this.isDisposed) {
       return;
     }
-    let promise = Private.loadObject(content.target_name, content.target_module,
-      this._targetRegistry).then(target => {
-        let comm = new CommHandler(
-          content.target_name,
-          content.comm_id,
-          this,
-          () => { this._unregisterComm(content.comm_id); }
-        );
-        let response : any;
-        try {
-          response = target(comm, msg);
-        } catch (e) {
-          comm.close();
-          console.error('Exception opening new comm');
-          throw(e);
-        }
-        return Promise.resolve(response).then(() => {
-          if (this.isDisposed) {
-            return;
-          }
-          return comm;
-        });
-    });
-    this._commPromises.set(content.comm_id, promise);
+    let target = await Private.loadObject(content.target_name, content.target_module, this._targetRegistry);
+    let comm = new CommHandler(
+      content.target_name,
+      content.comm_id,
+      this,
+      () => { this._unregisterComm(content.comm_id); }
+    );
+    try {
+      await target(comm, msg);
+    } catch (e) {
+      // TODO: do we need to await this?
+      await comm.close();
+      console.error('Exception opening new comm');
+      throw(e);
+    }
+    // TODO: do we need to check if the comm is disposed?
+    this._comms.set(content.comm_id, comm);
   }
 
   /**
    * Handle 'comm_close' kernel message.
    */
-  private _handleCommClose(msg: KernelMessage.ICommCloseMsg): void {
+  private async _handleCommClose(msg: KernelMessage.ICommCloseMsg): Promise<void> {
     let content = msg.content;
-    let promise = this._commPromises.get(content.comm_id);
-    if (!promise) {
+    let comm = this._comms.get(content.comm_id);
+    if (!comm) {
       console.error('Comm not found for comm id ' + content.comm_id);
       return;
     }
-    promise.then((comm) => {
-      if (!comm) {
-        return;
-      }
-      this._unregisterComm(comm.commId);
-      try {
-        let onClose = comm.onClose;
-        if (onClose) {
-          onClose(msg);
-        }
-        (comm as CommHandler).dispose();
-      } catch (e) {
-        console.error('Exception closing comm: ', e, e.stack, msg);
+    this._unregisterComm(comm.commId);
+    try {
+      let onClose = comm.onClose;
+      if (onClose) {
+        await onClose(msg);
       }
-    });
+      (comm as CommHandler).dispose();
+    } catch (e) {
+      console.error('Exception closing comm: ', e, e.stack, msg);
+    }
   }
 
   /**
    * Handle a 'comm_msg' kernel message.
    */
-  private _handleCommMsg(msg: KernelMessage.ICommMsgMsg): void {
+  private async _handleCommMsg(msg: KernelMessage.ICommMsgMsg): Promise<void> {
     let content = msg.content;
-    let promise = this._commPromises.get(content.comm_id);
-    if (!promise) {
+    let comm = this._comms.get(content.comm_id);
+    if (!comm) {
       // We do have a registered comm for this comm id, ignore.
       return;
     }
-    promise.then((comm) => {
-      if (!comm) {
-        return;
-      }
-      try {
-        let onMsg = comm.onMsg;
-        if (onMsg) {
-          onMsg(msg);
-        }
-      } catch (e) {
-        console.error('Exception handling comm msg: ', e, e.stack, msg);
+    try {
+      let onMsg = comm.onMsg;
+      if (onMsg) {
+        await onMsg(msg);
       }
-    });
+    } catch (e) {
+      console.error('Exception handling comm msg: ', e, e.stack, msg);
+    }
   }
 
   /**
    * Unregister a comm instance.
    */
   private _unregisterComm(commId: string) {
-    this._commPromises.delete(commId);
+    this._comms.delete(commId);
   }
 
   /**
@@ -933,12 +909,16 @@ class DefaultKernel implements Kernel.IKernel {
 
   /**
    * Handle a websocket message, validating and routing appropriately.
+   *
+   * TODO: convert to asynchronous processing.
    */
   private _onWSMessage = (evt: MessageEvent) => {
     if (this._wsStopped) {
       // If the socket is being closed, ignore any messages
       return;
     }
+
+    // Notify immediately if there is an error with the message.
     let msg = serialize.deserialize(evt.data);
     try {
       validate.validateMessage(msg);
@@ -947,8 +927,21 @@ class DefaultKernel implements Kernel.IKernel {
       return;
     }
 
+    // Handle the message asynchronously, in the order received.
+    this._msgChain = this._msgChain.then(() => {
+      return this._handleMessage(msg);
+    });
+
+    // TODO: should this emit happen asynchronously? Should we have two events,
+    // one for a message received here, and then another after it is handled
+    // asynchronously?
+    this._anyMessage.emit({msg, direction: 'recv'});
+  }
+
+  private async _handleMessage(msg: KernelMessage.IMessage): Promise<void> {
     let handled = false;
 
+    // Check to see if we have a display_id we need to reroute.
     if (msg.parent_header && msg.channel === 'iopub') {
       switch (msg.header.msg_type) {
       case 'display_data':
@@ -958,7 +951,7 @@ class DefaultKernel implements Kernel.IKernel {
         let transient = (msg.content.transient || {}) as JSONObject;
         let displayId = transient['display_id'] as string;
         if (displayId) {
-          handled = this._handleDisplayId(displayId, msg);
+          handled = await this._handleDisplayId(displayId, msg);
         }
         break;
       default:
@@ -970,7 +963,7 @@ class DefaultKernel implements Kernel.IKernel {
       let parentHeader = msg.parent_header as KernelMessage.IHeader;
       let future = this._futures && this._futures.get(parentHeader.msg_id);
       if (future) {
-        future.handleMsg(msg);
+        await future.handleMsg(msg);
       } else {
         // If the message was sent by us and was not iopub, it is orphaned.
         let owned = parentHeader.session === this.clientId;
@@ -982,23 +975,23 @@ class DefaultKernel implements Kernel.IKernel {
     if (msg.channel === 'iopub') {
       switch (msg.header.msg_type) {
       case 'status':
+        // Updating the status is synchronous, and we call no user code
         this._updateStatus((msg as KernelMessage.IStatusMsg).content.execution_state);
         break;
       case 'comm_open':
-        this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
+        await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
         break;
       case 'comm_msg':
-        this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
+        await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
         break;
       case 'comm_close':
-        this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
+        await this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
         break;
       default:
         break;
       }
       this._iopubMessage.emit(msg as KernelMessage.IIOPubMessage);
     }
-    this._anyMessage.emit({msg, direction: 'recv'});
   }
 
   /**
@@ -1037,7 +1030,7 @@ class DefaultKernel implements Kernel.IKernel {
   private _reconnectAttempt = 0;
   private _isReady = false;
   private _futures: Map<string, KernelFutureHandler>;
-  private _commPromises: Map<string, Promise<Kernel.IComm | undefined>>;
+  private _comms: Map<string, Kernel.IComm>;
   private _targetRegistry: { [key: string]: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void; } = Object.create(null);
   private _info: KernelMessage.IInfoReply | null = null;
   private _pendingMessages: KernelMessage.IMessage[] = [];
@@ -1050,6 +1043,7 @@ class DefaultKernel implements Kernel.IKernel {
   private _displayIdToParentIds = new Map<string, string[]>();
   private _msgIdToDisplayIds = new Map<string, string[]>();
   private _terminated = new Signal<this, void>(this);
+  private _msgChain = Promise.resolve();
   private _noOp = () => { /* no-op */};
 }
 

+ 56 - 44
packages/services/src/kernel/future.ts

@@ -17,7 +17,6 @@ import {
   KernelMessage
 } from './messages';
 
-
 /**
  * Implementation of a kernel future.
  */
@@ -53,42 +52,42 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture {
   /**
    * Get the reply handler.
    */
-  get onReply(): (msg: KernelMessage.IShellMessage) => void {
+  get onReply(): (msg: KernelMessage.IShellMessage) => Promise<void> | void {
     return this._reply;
   }
 
   /**
    * Set the reply handler.
    */
-  set onReply(cb: (msg: KernelMessage.IShellMessage) => void) {
+  set onReply(cb: (msg: KernelMessage.IShellMessage) => Promise<void> | void) {
     this._reply = cb;
   }
 
   /**
    * Get the iopub handler.
    */
-  get onIOPub(): (msg: KernelMessage.IIOPubMessage) => void {
+  get onIOPub(): (msg: KernelMessage.IIOPubMessage) => Promise<void> | void {
     return this._iopub;
   }
 
   /**
    * Set the iopub handler.
    */
-  set onIOPub(cb: (msg: KernelMessage.IIOPubMessage) => void) {
+  set onIOPub(cb: (msg: KernelMessage.IIOPubMessage) => Promise<void> | void) {
     this._iopub = cb;
   }
 
   /**
    * Get the stdin handler.
    */
-  get onStdin(): (msg: KernelMessage.IStdinMessage) => void {
+  get onStdin(): (msg: KernelMessage.IStdinMessage) => Promise<void> | void {
     return this._stdin;
   }
 
   /**
    * Set the stdin handler.
    */
-  set onStdin(cb: (msg: KernelMessage.IStdinMessage) => void) {
+  set onStdin(cb: (msg: KernelMessage.IStdinMessage) => Promise<void> | void) {
     this._stdin = cb;
   }
 
@@ -105,7 +104,7 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture {
    * If a hook is registered during the hook processing, it won't run until the next message.
    * If a hook is removed during the hook processing, it will be deactivated immediately.
    */
-  registerMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => boolean): void {
+  registerMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => Promise<boolean> | boolean): void {
     this._hooks.add(hook);
   }
 
@@ -117,7 +116,7 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture {
    * #### Notes
    * If a hook is removed during the hook processing, it will be deactivated immediately.
    */
-  removeMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => boolean): void {
+  removeMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => Promise<boolean> | boolean): void {
     if (this.isDisposed) {
       return;
     }
@@ -148,25 +147,25 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture {
   /**
    * Handle an incoming kernel message.
    */
-  handleMsg(msg: KernelMessage.IMessage): void {
+  async handleMsg(msg: KernelMessage.IMessage): Promise<void> {
     switch (msg.channel) {
     case 'shell':
-      this._handleReply(msg as KernelMessage.IShellMessage);
+      await this._handleReply(msg as KernelMessage.IShellMessage);
       break;
     case 'stdin':
-      this._handleStdin(msg as KernelMessage.IStdinMessage);
+      await this._handleStdin(msg as KernelMessage.IStdinMessage);
       break;
     case 'iopub':
-      this._handleIOPub(msg as KernelMessage.IIOPubMessage);
+      await this._handleIOPub(msg as KernelMessage.IIOPubMessage);
       break;
     default:
       break;
     }
   }
 
-  private _handleReply(msg: KernelMessage.IShellMessage): void {
+  private async _handleReply(msg: KernelMessage.IShellMessage): Promise<void> {
     let reply = this._reply;
-    if (reply) { reply(msg); }
+    if (reply) { await reply(msg); }
     this._replyMsg = msg;
     this._setFlag(Private.KernelFutureFlag.GotReply);
     if (this._testFlag(Private.KernelFutureFlag.GotIdle)) {
@@ -174,15 +173,15 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture {
     }
   }
 
-  private _handleStdin(msg: KernelMessage.IStdinMessage): void {
+  private async _handleStdin(msg: KernelMessage.IStdinMessage): Promise<void> {
     let stdin = this._stdin;
-    if (stdin) { stdin(msg); }
+    if (stdin) { await stdin(msg); }
   }
 
-  private _handleIOPub(msg: KernelMessage.IIOPubMessage): void {
-    let process = this._hooks.process(msg);
+  private async _handleIOPub(msg: KernelMessage.IIOPubMessage): Promise<void> {
+    let process = await this._hooks.process(msg);
     let iopub = this._iopub;
-    if (process && iopub) { iopub(msg); }
+    if (process && iopub) { await iopub(msg); }
     if (KernelMessage.isStatusMsg(msg) &&
         msg.content.execution_state === 'idle') {
       this._setFlag(Private.KernelFutureFlag.GotIdle);
@@ -221,9 +220,9 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture {
 
   private _msg: KernelMessage.IShellMessage;
   private _status = 0;
-  private _stdin: (msg: KernelMessage.IStdinMessage) => void = Private.noOp;
-  private _iopub: (msg: KernelMessage.IIOPubMessage) => void = Private.noOp;
-  private _reply: (msg: KernelMessage.IShellMessage) => void = Private.noOp;
+  private _stdin: (msg: KernelMessage.IStdinMessage) => Promise<void> | void = Private.noOp;
+  private _iopub: (msg: KernelMessage.IIOPubMessage) => Promise<void> | void = Private.noOp;
+  private _reply: (msg: KernelMessage.IShellMessage) => Promise<void> | void = Private.noOp;
   private _done = new PromiseDelegate<KernelMessage.IShellMessage>();
   private _replyMsg: KernelMessage.IShellMessage;
   private _hooks = new Private.HookList<KernelMessage.IIOPubMessage>();
@@ -238,11 +237,6 @@ namespace Private {
   export
   const noOp = () => { /* no-op */ };
 
-  /**
-   * A polyfill for a function to run code outside of the current execution context.
-   */
-  let defer = typeof requestAnimationFrame === 'function' ? requestAnimationFrame : setImmediate;
-
   export
   class HookList<T> {
     /**
@@ -250,7 +244,7 @@ namespace Private {
      *
      * @param hook - The callback to register.
      */
-    add(hook: (msg: T) => boolean): void {
+    add(hook: (msg: T) => boolean | Promise<boolean>): void {
       this.remove(hook);
       this._hooks.push(hook);
     }
@@ -260,7 +254,7 @@ namespace Private {
      *
      * @param hook - The callback to remove.
      */
-    remove(hook: (msg: T) => boolean): void {
+    remove(hook: (msg: T) => boolean | Promise<boolean>): void {
       let index = this._hooks.indexOf(hook);
       if (index >= 0) {
         this._hooks[index] = null;
@@ -272,28 +266,42 @@ namespace Private {
      * Process a message through the hooks.
      *
      * #### Notes
-     * The most recently registered hook is run first.
-     * If the hook returns false, any later hooks will not run.
-     * If a hook throws an error, the error is logged to the console and the next hook is run.
-     * If a hook is registered during the hook processing, it won't run until the next message.
-     * If a hook is removed during the hook processing, it will be deactivated immediately.
+     * The hooks can be asynchronous, returning a promise, and hook processing
+     * pauses until the promise resolves. The most recently registered hook is
+     * run first. If the hook returns false, any later hooks will not run. If a
+     * hook throws an error, the error is logged to the console and the next
+     * hook is run. If a hook is registered during the hook processing, it won't
+     * run until the next message. If a hook is removed during the hook
+     * processing, it will be deactivated immediately.
      */
-    process(msg: T): boolean {
+    async process(msg: T): Promise<boolean> {
+      // Wait until we can start a new process run.
+      await this._processing;
+
+      // Reserve a process run for ourselves.
+      let processing = new PromiseDelegate<void>();
+      this._processing = processing.promise;
+
       let continueHandling: boolean;
-      // most recently-added hook is called first
+
+      // Call the end hook (most recently-added) first. Starting at the end also
+      // guarantees that hooks added during the processing will not be run in
+      // this invocation.
       for (let i = this._hooks.length - 1; i >= 0; i--) {
         let hook = this._hooks[i];
         if (hook === null) { continue; }
         try {
-          continueHandling = hook(msg);
+          continueHandling = await hook(msg);
         } catch (err) {
           continueHandling = true;
           console.error(err);
         }
         if (continueHandling === false) {
+          processing.resolve(undefined);
           return false;
         }
       }
+      processing.resolve(undefined);
       return true;
     }
 
@@ -301,10 +309,13 @@ namespace Private {
      * Schedule a cleanup of the list, removing any hooks that have been nulled out.
      */
     private _scheduleCompact(): void {
-      if (!this._cleanupScheduled) {
-        this._cleanupScheduled = true;
-        defer(() => {
-          this._cleanupScheduled = false;
+      if (!this._compactScheduled) {
+        this._compactScheduled = true;
+
+        // Make sure we compact the list between processing phases. We may want
+        // to rate-limit this compaction with a requestAnimationFrame as well.
+        this._processing = this._processing.then(() => {
+          this._compactScheduled = false;
           this._compact();
         });
       }
@@ -326,8 +337,9 @@ namespace Private {
       this._hooks.length -= numNulls;
     }
 
-    private _hooks: (((msg: T) => boolean) | null)[] = [];
-    private _cleanupScheduled: boolean;
+    private _hooks: (((msg: T) => boolean | Promise<boolean>) | null)[] = [];
+    private _compactScheduled: boolean;
+    private _processing: Promise<void>;
   }
 
   /**

+ 18 - 6
packages/services/src/kernel/kernel.ts

@@ -323,7 +323,7 @@ namespace Kernel {
      * callback will be overidden.  A registered comm target handler will take
      * precedence over a comm which specifies a `target_module`.
      */
-    registerCommTarget(targetName: string, callback: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void): IDisposable;
+    registerCommTarget(targetName: string, callback: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => Promise<void>): IDisposable;
 
     /**
      * Register an IOPub message hook.
@@ -694,18 +694,30 @@ namespace Kernel {
 
     /**
      * The reply handler for the kernel future.
+     *
+     * #### Notes
+     * If the handler returns a promise, message processing pauses until the
+     * promise is resolved.
      */
-    onReply: (msg: KernelMessage.IShellMessage) => void;
+    onReply: (msg: KernelMessage.IShellMessage) => Promise<void> | void;
 
     /**
      * The stdin handler for the kernel future.
+     *
+     * #### Notes
+     * If the handler returns a promise, message processing pauses until the
+     * promise is resolved.
      */
-    onStdin: (msg: KernelMessage.IStdinMessage) => void;
+    onStdin: (msg: KernelMessage.IStdinMessage) => Promise<void> | void;
 
     /**
      * The iopub handler for the kernel future.
+     *
+     * #### Notes
+     * If the handler returns a promise, message processing pauses until the
+     * promise is resolved.
      */
-    onIOPub: (msg: KernelMessage.IIOPubMessage) => void;
+    onIOPub: (msg: KernelMessage.IIOPubMessage) => Promise<void> | void;
 
     /**
      * Register hook for IOPub messages.
@@ -760,12 +772,12 @@ namespace Kernel {
      * This is called when the comm is closed from either the server or
      * client.
      */
-    onClose: (msg: KernelMessage.ICommCloseMsg) => void;
+    onClose: (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void;
 
     /**
      * Callback for a comm message received event.
      */
-    onMsg: (msg: KernelMessage.ICommMsgMsg) => void;
+    onMsg: (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void;
 
     /**
      * Open a comm with optional data and metadata.