Browse Source

First pass at handling the case for restarting a kernel and canceling processing of old kernel messages in the async queue.

Jason Grout 7 years ago
parent
commit
6d6425b551
1 changed files with 36 additions and 7 deletions
  1. 36 7
      packages/services/src/kernel/default.ts

+ 36 - 7
packages/services/src/kernel/default.ts

@@ -241,6 +241,8 @@ class DefaultKernel implements Kernel.IKernel {
     this._clearSocket();
     this._futures.forEach(future => { future.dispose(); });
     this._comms.forEach(comm => { comm.dispose(); });
+    this._kernelSession = '';
+    this._msgChain = Promise.reject('Kernel is disposed');
     this._displayIdToParentIds.clear();
     this._msgIdToDisplayIds.clear();
     ArrayExt.removeFirstOf(Private.runningKernels, this);
@@ -395,6 +397,10 @@ class DefaultKernel implements Kernel.IKernel {
    *
    * Fulfills with the `kernel_info_response` content when the shell reply is
    * received and validated.
+   *
+   * // TODO: this should be automatically run every time our kernel restarts,
+   * before we say the kernel is ready, and cache the info and the kernel
+   * session id. Further calls to this should returned the cached results.
    */
   requestKernelInfo(): Promise<KernelMessage.IInfoReplyMsg> {
     let options: KernelMessage.IOptions = {
@@ -405,7 +411,11 @@ class DefaultKernel implements Kernel.IKernel {
     };
     let msg = KernelMessage.createShellMessage(options);
     return Private.handleShellMessage(this, msg).then(reply => {
+      if (this.isDisposed) {
+        throw new Error('Disposed kernel');
+      }
       this._info = (reply as KernelMessage.IInfoReplyMsg).content;
+      this._kernelSession = reply.header.session;
       return reply as KernelMessage.IInfoReplyMsg;
     });
   }
@@ -770,8 +780,10 @@ class DefaultKernel implements Kernel.IKernel {
   private _clearState(): void {
     this._isReady = false;
     this._pendingMessages = [];
+    this._msgChain = Promise.resolve();
     this._futures.forEach(future => { future.dispose(); });
     this._comms.forEach(comm => { comm.dispose(); });
+    this._kernelSession = '';
     this._futures = new Map<string, KernelFutureHandler>();
     this._comms = new Map<string, Kernel.IComm>();
     this._displayIdToParentIds.clear();
@@ -784,25 +796,29 @@ class DefaultKernel implements Kernel.IKernel {
   private async _handleCommOpen(msg: KernelMessage.ICommOpenMsg): Promise<void> {
     let content = msg.content;
     if (this.isDisposed) {
-      return;
+      throw new Error('Kernel object is disposed');
     }
-    let target = await Private.loadObject(content.target_name, content.target_module, this._targetRegistry);
+
+    if (msg.header.msg_type !== 'kernel_info_reply' && msg.header.session !== this._kernelSession) {
+      throw new Error(`Message from old kernel session: ${msg.header.msg_type}`);
+    }
+
     let comm = new CommHandler(
       content.target_name,
       content.comm_id,
       this,
       () => { this._unregisterComm(content.comm_id); }
     );
+    this._comms.set(content.comm_id, comm);
+
     try {
+      let target = await Private.loadObject(content.target_name, content.target_module, this._targetRegistry);
       await target(comm, msg);
     } catch (e) {
-      // TODO: do we need to await this?
       await comm.close();
       console.error('Exception opening new comm');
       throw(e);
     }
-    // TODO: do we need to check if the comm is disposed?
-    this._comms.set(content.comm_id, comm);
   }
 
   /**
@@ -899,6 +915,7 @@ class DefaultKernel implements Kernel.IKernel {
     // Update our status to connected.
     this._updateStatus('connected');
     // Get the kernel info, signaling that the kernel is ready.
+    // TODO: requestKernelInfo shouldn't make a request, but should return cached info?
     this.requestKernelInfo().then(() => {
       this._connectionPromise.resolve(void 0);
     }).catch(err => {
@@ -909,8 +926,6 @@ class DefaultKernel implements Kernel.IKernel {
 
   /**
    * Handle a websocket message, validating and routing appropriately.
-   *
-   * TODO: convert to asynchronous processing.
    */
   private _onWSMessage = (evt: MessageEvent) => {
     if (this._wsStopped) {
@@ -929,7 +944,20 @@ class DefaultKernel implements Kernel.IKernel {
 
     // Handle the message asynchronously, in the order received.
     this._msgChain = this._msgChain.then(() => {
+      // If the message is not a kernel info reply message, check to make sure
+      // the kernel session is the same as the current one. Kernel info reply
+      // messages are where we get the kernel session, so we don't check those.
+
+      if (msg.header.msg_type !== 'kernel_info_reply' && msg.header.session !== this._kernelSession) {
+        throw new Error(`Message from old kernel session: ${msg.header.msg_type}`);
+      }
+
       return this._handleMessage(msg);
+    }).catch(error => {
+      // Any errors in handling a message should be logged, and then we move to
+      // the next message. We don't want to prevent handling important messages
+      // like the execution reply.
+      console.error(error);
     });
 
     // TODO: should this emit happen asynchronously? Should we have two events,
@@ -1021,6 +1049,7 @@ class DefaultKernel implements Kernel.IKernel {
   private _id = '';
   private _name = '';
   private _status: Kernel.Status = 'unknown';
+  private _kernelSession = '';
   private _clientId = '';
   private _isDisposed = false;
   private _wsStopped = false;