Browse Source

Do not kill the websocket connection on kernel restart.

This makes sure we don’t miss messages during the restart, and helps the browser not have so many different websocket connections.
Jason Grout 6 years ago
parent
commit
78abc44202
1 changed files with 41 additions and 31 deletions
  1. 41 31
      packages/services/src/kernel/default.ts

+ 41 - 31
packages/services/src/kernel/default.ts

@@ -187,7 +187,7 @@ export class DefaultKernel implements Kernel.IKernel {
    * A promise that is fulfilled when the kernel is ready.
    */
   get ready(): Promise<void> {
-    return this._connectionPromise.promise;
+    return this._readyPromise.promise;
   }
 
   /**
@@ -349,7 +349,6 @@ export class DefaultKernel implements Kernel.IKernel {
   async handleRestart(): Promise<void> {
     await this._clearState();
     this._updateStatus('restarting');
-    this._clearSocket();
   }
 
   /**
@@ -362,7 +361,7 @@ export class DefaultKernel implements Kernel.IKernel {
     this._clearSocket();
     this._updateStatus('reconnecting');
     this._createSocket();
-    return this._connectionPromise.promise;
+    return this._readyPromise.promise;
   }
 
   /**
@@ -376,10 +375,12 @@ export class DefaultKernel implements Kernel.IKernel {
    * On a valid response, closes the websocket and disposes of the kernel
    * object, and fulfills the promise.
    *
-   * The promise will be rejected if the kernel status is `Dead` or if the
+   * The promise will be rejected if the kernel status is `dead` or if the
    * request fails or the response is invalid.
    */
   async shutdown(): Promise<void> {
+    // TODO: we are not keeping the promise in the docs above to throw an error
+    // if status is dead.
     if (this.status === 'dead') {
       this._clearSocket();
       await this._clearState();
@@ -787,10 +788,13 @@ export class DefaultKernel implements Kernel.IKernel {
 
   /**
    * Clear the socket state.
+   *
+   * #### Notes
+   * When calling this, you should also set the status to something that will
+   * reset the kernel ready state.
    */
   private _clearSocket(): void {
     this._wsStopped = true;
-    this._isReady = false;
     if (this._ws !== null) {
       // Clear the websocket event handlers and the socket itself.
       this._ws.onopen = this._noOp;
@@ -811,13 +815,25 @@ export class DefaultKernel implements Kernel.IKernel {
       case 'idle':
       case 'busy':
       case 'connected':
-        this._isReady = true;
-        break;
       case 'restarting':
       case 'autorestarting':
+        if (!this._isReady && this._initialized) {
+          this._isReady = true;
+          this._readyPromise.resolve();
+        }
+        break;
       case 'reconnecting':
+        if (this._isReady) {
+          this._isReady = false;
+          this._readyPromise = new PromiseDelegate();
+        }
+        break;
       case 'dead':
-        this._isReady = false;
+        if (this._isReady) {
+          this._isReady = false;
+          this._readyPromise = new PromiseDelegate();
+        }
+        this._readyPromise.reject('Kernel is dead');
         break;
       default:
         console.error('invalid kernel status:', status);
@@ -854,7 +870,6 @@ export class DefaultKernel implements Kernel.IKernel {
    * Clear the internal state.
    */
   private async _clearState(): Promise<void> {
-    this._isReady = false;
     this._pendingMessages = [];
     const futuresResolved: Promise<KernelMessage.IShellMessage>[] = [];
     this._futures.forEach(future => {
@@ -1007,7 +1022,6 @@ export class DefaultKernel implements Kernel.IKernel {
       url = url + `&token=${encodeURIComponent(token)}`;
     }
 
-    this._connectionPromise = new PromiseDelegate<void>();
     this._wsStopped = false;
     this._ws = new settings.WebSocket(url);
 
@@ -1025,19 +1039,28 @@ export class DefaultKernel implements Kernel.IKernel {
    */
   private _onWSOpen = (evt: Event) => {
     this._reconnectAttempt = 0;
-    // Allow the message to get through.
-    this._isReady = true;
-    // Update our status to connected.
     this._updateStatus('connected');
+
+    // We temporarily set the ready status to true so our kernel info request
+    // below will go through.
+    this._isReady = true;
+
     // Get the kernel info, signaling that the kernel is ready.
-    // TODO: requestKernelInfo shouldn't make a request, but should return cached info?
+
+    // TODO: requestKernelInfo maybe shouldn't make a request, but should return
+    // cached info since there may be other connections to this kernel?
     this.requestKernelInfo()
       .then(() => {
-        this._connectionPromise.resolve(void 0);
+        this._initialized = true;
+        this._readyPromise.resolve(void 0);
       })
       .catch(err => {
-        this._connectionPromise.reject(err);
+        this._initialized = true;
+        this._readyPromise.reject(err);
       });
+
+    // Reset the isReady status after we sent our message so others wait for
+    // the kernel info request to come back.
     this._isReady = false;
   };
 
@@ -1136,10 +1159,6 @@ export class DefaultKernel implements Kernel.IKernel {
               // 'restarting' and 'autorestarting'.
               await this.handleRestart();
               this._updateStatus('autorestarting');
-              // handleRestart above disconnects the websocket, so reconnect,
-              // which ensures that we come back up in a ready state. This
-              // follows the logic in the restartKernel method.
-              await this.reconnect();
             });
           }
           break;
@@ -1184,9 +1203,6 @@ export class DefaultKernel implements Kernel.IKernel {
       this._reconnectAttempt += 1;
     } else {
       this._updateStatus('dead');
-      this._connectionPromise.reject(
-        new Error('Could not establish connection')
-      );
     }
   };
 
@@ -1202,6 +1218,8 @@ export class DefaultKernel implements Kernel.IKernel {
   private _reconnectLimit = 7;
   private _reconnectAttempt = 0;
   private _isReady = false;
+  private _readyPromise: PromiseDelegate<void> = new PromiseDelegate();
+  private _initialized = false;
   private _futures: Map<string, KernelFutureHandler>;
   private _comms: Map<string, Kernel.IComm>;
   private _targetRegistry: {
@@ -1212,7 +1230,6 @@ export class DefaultKernel implements Kernel.IKernel {
   } = Object.create(null);
   private _info: KernelMessage.IInfoReply | null = null;
   private _pendingMessages: KernelMessage.IMessage[] = [];
-  private _connectionPromise: PromiseDelegate<void>;
   private _specPromise: Promise<Kernel.ISpecModel>;
   private _statusChanged = new Signal<this, Kernel.Status>(this);
   private _iopubMessage = new Signal<this, KernelMessage.IIOPubMessage>(this);
@@ -1563,13 +1580,6 @@ namespace Private {
     }
     let data = await response.json();
     validate.validateModel(data);
-    // Reconnect the other kernels asynchronously, but don't wait for them.
-    each(runningKernels, k => {
-      if (k !== kernel && k.id === kernel.id) {
-        void k.reconnect();
-      }
-    });
-    await kernel.reconnect();
   }
 
   /**