浏览代码

WIP more async kernel message processing work.

Jason Grout 6 年之前
父节点
当前提交
4b545d908c
共有 1 个文件被更改,包括 84 次插入52 次删除
  1. 84 52
      packages/services/src/kernel/default.ts

+ 84 - 52
packages/services/src/kernel/default.ts

@@ -63,7 +63,12 @@ declare var requirejs: any;
 
 
 /**
- * Implementation of the Kernel object
+ * Implementation of the Kernel object.
+ *
+ * #### Notes
+ * Messages from the server are handled in the order they were received and
+ * asynchronously. Any message handler can return a promise, and message
+ * handling will pause until the promise is fulfilled.
  */
 export
 class DefaultKernel implements Kernel.IKernel {
@@ -105,8 +110,7 @@ class DefaultKernel implements Kernel.IKernel {
    * A signal emitted for iopub kernel messages.
    *
    * #### Notes
-   * This signal is emitted after the iopub message is handled, which may
-   * asynchronously after it is received.
+   * This signal is emitted after the iopub message is handled asynchronously.
    */
   get iopubMessage(): ISignal<this, KernelMessage.IIOPubMessage> {
     return this._iopubMessage;
@@ -114,6 +118,9 @@ class DefaultKernel implements Kernel.IKernel {
 
   /**
    * A signal emitted for unhandled kernel message.
+   *
+   * #### Notes
+   * This signal is emitted for a message that was not handled.
    */
   get unhandledMessage(): ISignal<this, KernelMessage.IMessage> {
     return this._unhandledMessage;
@@ -399,7 +406,7 @@ class DefaultKernel implements Kernel.IKernel {
    * Fulfills with the `kernel_info_response` content when the shell reply is
    * received and validated.
    *
-   * // TODO: this should be automatically run every time our kernel restarts,
+   * TODO: this should be automatically run every time our kernel restarts,
    * before we say the kernel is ready, and cache the info and the kernel
    * session id. Further calls to this should returned the cached results.
    */
@@ -588,14 +595,18 @@ class DefaultKernel implements Kernel.IKernel {
    * @returns A disposable used to unregister the message hook.
    *
    * #### Notes
-   * The IOPub hook system allows you to preempt the handlers for IOPub messages with a
-   * given parent_header message id. The most recently registered hook is run first.
-   * If the hook returns false, any later hooks and the future's onIOPub handler will not run.
-   * If a hook throws an error, the error is logged to the console and the next hook is run.
-   * If a hook is registered during the hook processing, it won't run until the next message.
-   * If a hook is disposed during the hook processing, it will be deactivated immediately.
+   * The IOPub hook system allows you to preempt the handlers for IOPub messages
+   * with a given parent_header message id. The most recently registered hook is
+   * run first. If the hook returns false, any later hooks and the future's
+   * onIOPub handler will not run. If a hook throws an error, the error is
+   * logged to the console and the next hook is run. If a hook is registered
+   * during the hook processing, it won't run until the next message. If a hook
+   * is disposed during the hook processing, it will be deactivated immediately.
    *
    * See also [[IFuture.registerMessageHook]].
+   *
+   * TODO: should this hook be an async function, and we wait to process the
+   * hooks until the promise is resolved?
    */
   registerMessageHook(msgId: string, hook: (msg: KernelMessage.IIOPubMessage) => boolean): IDisposable {
     let future = this._futures && this._futures.get(msgId);
@@ -620,9 +631,13 @@ class DefaultKernel implements Kernel.IKernel {
    * @returns A disposable used to unregister the comm target.
    *
    * #### Notes
-   * Only one comm target can be registered at a time, an existing
-   * callback will be overidden.  A registered comm target handler will take
-   * precedence over a comm which specifies a `target_module`.
+   * Only one comm target can be registered to a target name at a time, an
+   * existing callback for the same target name will be overidden.  A registered
+   * comm target handler will take precedence over a comm which specifies a
+   * `target_module`.
+   *
+   * If the callback returns a promise, kernel message processing will pause
+   * until the returned promise is fulfilled.
    */
   registerCommTarget(targetName: string, callback: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => Promise<void> | void): IDisposable {
     this._targetRegistry[targetName] = callback;
@@ -777,9 +792,9 @@ class DefaultKernel implements Kernel.IKernel {
   private _clearState(): void {
     this._isReady = false;
     this._pendingMessages = [];
-    this._msgChain = Promise.resolve();
     this._futures.forEach(future => { future.dispose(); });
     this._comms.forEach(comm => { comm.dispose(); });
+    this._msgChain = Promise.resolve();
     this._kernelSession = '';
     this._futures = new Map<string, KernelFutureHandler>();
     this._comms = new Map<string, Kernel.IComm>();
@@ -791,15 +806,8 @@ class DefaultKernel implements Kernel.IKernel {
    * Handle a `comm_open` kernel message.
    */
   private async _handleCommOpen(msg: KernelMessage.ICommOpenMsg): Promise<void> {
+    this._checkCurrentMessage(msg);
     let content = msg.content;
-    if (this.isDisposed) {
-      throw new Error('Kernel object is disposed');
-    }
-
-    if (msg.header.msg_type !== 'kernel_info_reply' && msg.header.session !== this._kernelSession) {
-      throw new Error(`Message from old kernel session: ${msg.header.msg_type}`);
-    }
-
     let comm = new CommHandler(
       content.target_name,
       content.comm_id,
@@ -812,16 +820,39 @@ class DefaultKernel implements Kernel.IKernel {
       let target = await Private.loadObject(content.target_name, content.target_module, this._targetRegistry);
       await target(comm, msg);
     } catch (e) {
-      await comm.close();
+      // TODO: should we wait for the comm to close? other places we are just
+      // waiting for client-side processing, but here we are waiting for a
+      // message round trip. It would probably be more consistent to just wait
+      // for the comm's onClose client-side function to return.
+      await comm.close().done;
       console.error('Exception opening new comm');
       throw(e);
     }
   }
 
+  /**
+   * Check to make sure it is okay to proceed to handle a message.
+   *
+   * #### Notes
+   * Because we handle messages asynchronously, before a message is handled the
+   * kernel might be disposed or restarted (and have a different session id).
+   * This function throws an error in each of these cases. This is meant to be
+   * called at the start of an asynchronous message handler.
+   */
+  private _checkCurrentMessage(msg: KernelMessage.IMessage) {
+    if (this.isDisposed) {
+      throw new Error('Kernel object is disposed');
+    }
+    if (msg.header.session !== this._kernelSession) {
+      throw new Error(`Message from old kernel session: ${msg.header.msg_type}`);
+    }
+  }
+
   /**
    * Handle 'comm_close' kernel message.
    */
   private async _handleCommClose(msg: KernelMessage.ICommCloseMsg): Promise<void> {
+    this._checkCurrentMessage(msg);
     let content = msg.content;
     let comm = this._comms.get(content.comm_id);
     if (!comm) {
@@ -829,34 +860,28 @@ class DefaultKernel implements Kernel.IKernel {
       return;
     }
     this._unregisterComm(comm.commId);
-    try {
-      let onClose = comm.onClose;
-      if (onClose) {
-        await onClose(msg);
-      }
-      (comm as CommHandler).dispose();
-    } catch (e) {
-      console.error('Exception closing comm: ', e, e.stack, msg);
+    let onClose = comm.onClose;
+    if (onClose) {
+      await onClose(msg);
     }
+    (comm as CommHandler).dispose();
   }
 
   /**
    * Handle a 'comm_msg' kernel message.
    */
   private async _handleCommMsg(msg: KernelMessage.ICommMsgMsg): Promise<void> {
+    this._checkCurrentMessage(msg);
     let content = msg.content;
     let comm = this._comms.get(content.comm_id);
     if (!comm) {
-      // We do have a registered comm for this comm id, ignore.
+      // We do not have a registered comm for this comm id, ignore.
+      // TODO: should we print an error like we do for _handleCommClose?
       return;
     }
-    try {
-      let onMsg = comm.onMsg;
-      if (onMsg) {
-        await onMsg(msg);
-      }
-    } catch (e) {
-      console.error('Exception handling comm msg: ', e, e.stack, msg);
+    let onMsg = comm.onMsg;
+    if (onMsg) {
+      await onMsg(msg);
     }
   }
 
@@ -931,34 +956,37 @@ class DefaultKernel implements Kernel.IKernel {
     }
 
     // Notify immediately if there is an error with the message.
-    let msg = serialize.deserialize(evt.data);
+    let msg: KernelMessage.IMessage;
     try {
+      msg = serialize.deserialize(evt.data);
       validate.validateMessage(msg);
     } catch (error) {
-      console.error(`Invalid message: ${error.message}`);
-      return;
+      error.message = `Kernel message validation error: ${error.message}`;
+      // We throw the error so that it bubbles up to the top, and displays the right stack.
+      throw error;
     }
 
     // Handle the message asynchronously, in the order received.
     this._msgChain = this._msgChain.then(() => {
-      // If the message is not a kernel info reply message, check to make sure
-      // the kernel session is the same as the current one. Kernel info reply
-      // messages are where we get the kernel session, so we don't check those.
 
-      if (msg.header.msg_type !== 'kernel_info_reply' && msg.header.session !== this._kernelSession) {
-        throw new Error(`Message from old kernel session: ${msg.header.msg_type}`);
+      // If the message isn't a kernel_info_reply, check to make sure it
+      // corresponds to the current kernel. kernel_info_reply messages can
+      // change the kernel session, so we allow those to pass.
+      if (msg.header.msg_type !== 'kernel_info_reply') {
+        this._checkCurrentMessage(msg);
       }
 
+      // Return so that any promises from handling a message are fulfilled
+      // before proceeding to the next message.
       return this._handleMessage(msg);
     }).catch(error => {
-      // Any errors in handling a message should be logged, and then we move to
-      // the next message. We don't want to prevent handling important messages
-      // like the execution reply.
+      // Log any errors in handling the message, thus resetting the _msgChain
+      // promise so we can process more messages.
+      // TODO: this messes up the error stack in the error, which is sad (the error stack will be printed, but the nice clickable error stack in the console won't be there, instead it will point to here.).
       console.error(error);
     });
 
-    // TODO: should this emit happen asynchronously? Should we have two events,
-    // one for a message received here, and then another after it is handled
+    // TODO: Should we have another signal emitted when a message is handled
     // asynchronously?
     this._anyMessage.emit({msg, direction: 'recv'});
   }
@@ -977,6 +1005,8 @@ class DefaultKernel implements Kernel.IKernel {
         let displayId = transient['display_id'] as string;
         if (displayId) {
           handled = await this._handleDisplayId(displayId, msg);
+          // The await above may make this message out of date, so check again.
+          this._checkCurrentMessage(msg);
         }
         break;
       default:
@@ -989,6 +1019,7 @@ class DefaultKernel implements Kernel.IKernel {
       let future = this._futures && this._futures.get(parentHeader.msg_id);
       if (future) {
         await future.handleMsg(msg);
+        this._checkCurrentMessage(msg);
       } else {
         // If the message was sent by us and was not iopub, it is orphaned.
         let owned = parentHeader.session === this.clientId;
@@ -1015,6 +1046,7 @@ class DefaultKernel implements Kernel.IKernel {
       default:
         break;
       }
+      this._checkCurrentMessage(msg);
       this._iopubMessage.emit(msg as KernelMessage.IIOPubMessage);
     }
   }