소스 검색

Backport PR #10792: Add a guard to avoid kernel deadlock on multiple input request (#11126)

Co-authored-by: Eric Charles <eric@datalayer.io>
MeeseeksMachine 3 년 전
부모
커밋
fec66b95ad

+ 28 - 0
packages/apputils/src/sessioncontext.tsx

@@ -106,6 +106,11 @@ export interface ISessionContext extends IObservableDisposable {
    */
   readonly connectionStatusChanged: ISignal<this, Kernel.ConnectionStatus>;
 
+  /**
+   * A flag indicating if session is has pending input, proxied from the session connection.
+   */
+  readonly pendingInput: boolean;
+
   /**
    * A signal emitted for a kernel messages, proxied from the session connection.
    */
@@ -395,6 +400,13 @@ export class SessionContext implements ISessionContext {
     return this._statusChanged;
   }
 
+  /**
+   * A flag indicating if the session has ending input, proxied from the kernel.
+   */
+  get pendingInput(): boolean {
+    return this._pendingInput;
+  }
+
   /**
    * A signal emitted when the kernel status changes, proxied from the kernel.
    */
@@ -906,6 +918,7 @@ export class SessionContext implements ISessionContext {
         this._onConnectionStatusChanged,
         this
       );
+      session.pendingInput.connect(this._onPendingInput, this);
       session.iopubMessage.connect(this._onIopubMessage, this);
       session.unhandledMessage.connect(this._onUnhandledMessage, this);
 
@@ -1055,6 +1068,17 @@ export class SessionContext implements ISessionContext {
     this._connectionStatusChanged.emit(status);
   }
 
+  /**
+   * Handle a change to the pending input.
+   */
+  private _onPendingInput(
+    sender: Session.ISessionConnection,
+    value: boolean
+  ): void {
+    // Set the signal value
+    this._pendingInput = value;
+  }
+
   /**
    * Handle an iopub message.
    */
@@ -1062,6 +1086,9 @@ export class SessionContext implements ISessionContext {
     sender: Session.ISessionConnection,
     message: KernelMessage.IIOPubMessage
   ): void {
+    if (message.header.msg_type === 'shutdown_reply') {
+      this.session!.kernel!.removeInputGuard();
+    }
     this._iopubMessage.emit(message);
   }
 
@@ -1108,6 +1135,7 @@ export class SessionContext implements ISessionContext {
   );
   private translator: ITranslator;
   private _trans: TranslationBundle;
+  private _pendingInput = false;
   private _iopubMessage = new Signal<this, KernelMessage.IIOPubMessage>(this);
   private _unhandledMessage = new Signal<this, KernelMessage.IMessage>(this);
   private _propertyChanged = new Signal<this, 'path' | 'name' | 'type'>(this);

+ 10 - 0
packages/notebook/src/actions.tsx

@@ -1979,6 +1979,16 @@ namespace Private {
             });
             break;
           }
+          if (sessionContext.pendingInput) {
+            void showDialog({
+              title: trans.__('Cell not executed due to pending input'),
+              body: trans.__(
+                'The cell has not been executed to avoid kernel deadlock as there is another pending input! Submit your pending input and try again.'
+              ),
+              buttons: [Dialog.okButton({ label: trans.__('Ok') })]
+            });
+            return Promise.resolve(false);
+          }
           const deletedCells = notebook.model?.deletedCells ?? [];
           executionScheduled.emit({ notebook, cell });
           return CodeCell.execute(cell as CodeCell, sessionContext, {

+ 28 - 0
packages/services/src/kernel/default.ts

@@ -142,6 +142,13 @@ export class KernelConnection implements Kernel.IKernelConnection {
     return this._anyMessage;
   }
 
+  /**
+   * A signal emitted when a kernel has pending inputs from the user.
+   */
+  get pendingInput(): ISignal<this, boolean> {
+    return this._pendingInput;
+  }
+
   /**
    * The id of the server-side kernel.
    */
@@ -437,6 +444,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
    * request fails or the response is invalid.
    */
   async interrupt(): Promise<void> {
+    this.hasPendingInput = false;
     if (this.status === 'dead') {
       throw new Error('Kernel is dead');
     }
@@ -472,6 +480,7 @@ export class KernelConnection implements Kernel.IKernelConnection {
     // Reconnect to the kernel to address cases where kernel ports
     // have changed during the restart.
     await this.reconnect();
+    this.hasPendingInput = false;
   }
 
   /**
@@ -815,6 +824,8 @@ export class KernelConnection implements Kernel.IKernelConnection {
 
     this._sendMessage(msg);
     this._anyMessage.emit({ msg, direction: 'send' });
+
+    this.hasPendingInput = false;
   }
 
   /**
@@ -955,6 +966,13 @@ export class KernelConnection implements Kernel.IKernelConnection {
     }
   }
 
+  /**
+   * Remove the input guard, if any.
+   */
+  removeInputGuard() {
+    this.hasPendingInput = false;
+  }
+
   /**
    * Handle a message with a display id.
    *
@@ -1501,6 +1519,14 @@ export class KernelConnection implements Kernel.IKernelConnection {
     }
   };
 
+  get hasPendingInput(): boolean {
+    return this._hasPendingInput;
+  }
+  set hasPendingInput(value: boolean) {
+    this._hasPendingInput = value;
+    this._pendingInput.emit(value);
+  }
+
   private _id = '';
   private _name = '';
   private _status: KernelMessage.Status = 'unknown';
@@ -1541,10 +1567,12 @@ export class KernelConnection implements Kernel.IKernelConnection {
   private _disposed = new Signal<this, void>(this);
   private _iopubMessage = new Signal<this, KernelMessage.IIOPubMessage>(this);
   private _anyMessage = new Signal<this, Kernel.IAnyMessageArgs>(this);
+  private _pendingInput = new Signal<this, boolean>(this);
   private _unhandledMessage = new Signal<this, KernelMessage.IMessage>(this);
   private _displayIdToParentIds = new Map<string, string[]>();
   private _msgIdToDisplayIds = new Map<string, string[]>();
   private _msgChain: Promise<void> = Promise.resolve();
+  private _hasPendingInput = false;
   private _noOp = () => {
     /* no-op */
   };

+ 1 - 0
packages/services/src/kernel/future.ts

@@ -238,6 +238,7 @@ export abstract class KernelFutureHandler<
   }
 
   private async _handleStdin(msg: KernelMessage.IStdinMessage): Promise<void> {
+    this._kernel.hasPendingInput = true;
     const stdin = this._stdin;
     if (stdin) {
       // tslint:disable-next-line:await-promise

+ 19 - 0
packages/services/src/kernel/kernel.ts

@@ -110,6 +110,15 @@ export interface IKernelConnection extends IObservableDisposable {
    */
   handleComms: boolean;
 
+  /**
+   * Whether the kernel connection has pending input.
+   *
+   * #### Notes
+   * This is a guard to avoid deadlock is the user asks input
+   * as second time before submitting his first input
+   */
+  hasPendingInput: boolean;
+
   /**
    * Send a shell message to the kernel.
    *
@@ -456,6 +465,11 @@ export interface IKernelConnection extends IObservableDisposable {
     hook: (msg: KernelMessage.IIOPubMessage) => boolean | PromiseLike<boolean>
   ): void;
 
+  /**
+   * Remove the input guard, if any.
+   */
+  removeInputGuard(): void;
+
   /**
    * A signal emitted when the kernel status changes.
    */
@@ -486,6 +500,11 @@ export interface IKernelConnection extends IObservableDisposable {
    */
   anyMessage: ISignal<this, IAnyMessageArgs>;
 
+  /**
+   * A signal emitted when a kernel has pending inputs from the user.
+   */
+  pendingInput: ISignal<this, boolean>;
+
   /**
    * The server settings for the kernel.
    */

+ 16 - 0
packages/services/src/session/default.ts

@@ -70,6 +70,13 @@ export class SessionConnection implements Session.ISessionConnection {
     return this._connectionStatusChanged;
   }
 
+  /**
+   * A signal proxied from the kernel pending input.
+   */
+  get pendingInput(): ISignal<this, boolean> {
+    return this._pendingInput;
+  }
+
   /**
    * A signal proxied from the kernel about iopub kernel messages.
    */
@@ -315,6 +322,7 @@ export class SessionConnection implements Session.ISessionConnection {
     this._kernel = kc;
     kc.statusChanged.connect(this.onKernelStatus, this);
     kc.connectionStatusChanged.connect(this.onKernelConnectionStatus, this);
+    kc.pendingInput.connect(this.onPendingInput, this);
     kc.unhandledMessage.connect(this.onUnhandledMessage, this);
     kc.iopubMessage.connect(this.onIOPubMessage, this);
     kc.anyMessage.connect(this.onAnyMessage, this);
@@ -340,6 +348,13 @@ export class SessionConnection implements Session.ISessionConnection {
     this._connectionStatusChanged.emit(state);
   }
 
+  /**
+   * Handle a change in the pendingInput.
+   */
+  protected onPendingInput(sender: Kernel.IKernelConnection, state: boolean) {
+    this._pendingInput.emit(state);
+  }
+
   /**
    * Handle iopub kernel messages.
    */
@@ -416,6 +431,7 @@ export class SessionConnection implements Session.ISessionConnection {
   private _connectionStatusChanged = new Signal<this, Kernel.ConnectionStatus>(
     this
   );
+  private _pendingInput = new Signal<this, boolean>(this);
   private _iopubMessage = new Signal<this, KernelMessage.IIOPubMessage>(this);
   private _unhandledMessage = new Signal<this, KernelMessage.IMessage>(this);
   private _anyMessage = new Signal<this, Kernel.IAnyMessageArgs>(this);

+ 6 - 0
packages/services/src/session/session.ts

@@ -55,6 +55,12 @@ export interface ISessionConnection extends IObservableDisposable {
    */
   connectionStatusChanged: ISignal<this, Kernel.ConnectionStatus>;
 
+  /**
+   * The kernel pendingInput signal, proxied from the current
+   * kernel.
+   */
+  pendingInput: ISignal<this, boolean>;
+
   /**
    * The kernel iopubMessage signal, proxied from the current kernel.
    */

+ 21 - 0
packages/services/test/kernel/ikernel.spec.ts

@@ -92,6 +92,27 @@ describe('Kernel.IKernel', () => {
     });
   });
 
+  describe('#pendingInput', () => {
+    it('should be a signal following input request', async () => {
+      let called = false;
+      defaultKernel.pendingInput.connect((sender, args) => {
+        if (!called) {
+          called = true;
+          defaultKernel.sendInputReply({ status: 'ok', value: 'foo' });
+        }
+      });
+      const code = `input("Input something")`;
+      await defaultKernel.requestExecute(
+        {
+          code: code,
+          allow_stdin: true
+        },
+        true
+      ).done;
+      expect(called).toBe(true);
+    });
+  });
+
   describe('#iopubMessage', () => {
     it('should be emitted for an iopub message', async () => {
       let called = false;

+ 18 - 0
testutils/src/mock.ts

@@ -245,8 +245,12 @@ export const KernelMock = jest.fn<
     Kernel.IKernelConnection,
     Kernel.Status
   >(thisObject);
+  const pendingInputSignal = new Signal<Kernel.IKernelConnection, boolean>(
+    thisObject
+  );
   (thisObject as any).statusChanged = statusChangedSignal;
   (thisObject as any).iopubMessage = iopubMessageSignal;
+  (thisObject as any).pendingInput = pendingInputSignal;
   (thisObject as any).hasPendingInput = false;
   return thisObject;
 });
@@ -330,6 +334,10 @@ export const SessionConnectionMock = jest.fn<
     KernelMessage.IMessage
   >(thisObject);
 
+  const pendingInputSignal = new Signal<Session.ISessionConnection, boolean>(
+    thisObject
+  );
+
   kernel!.iopubMessage.connect((_, args) => {
     iopubMessageSignal.emit(args);
   }, thisObject);
@@ -338,6 +346,10 @@ export const SessionConnectionMock = jest.fn<
     statusChangedSignal.emit(args);
   }, thisObject);
 
+  kernel!.pendingInput.connect((_, args) => {
+    pendingInputSignal.emit(args);
+  }, thisObject);
+
   (thisObject as any).disposed = disposedSignal;
   (thisObject as any).connectionStatusChanged = connectionStatusChangedSignal;
   (thisObject as any).propertyChanged = propertyChangedSignal;
@@ -345,6 +357,7 @@ export const SessionConnectionMock = jest.fn<
   (thisObject as any).kernelChanged = kernelChangedSignal;
   (thisObject as any).iopubMessage = iopubMessageSignal;
   (thisObject as any).unhandledMessage = unhandledMessageSignal;
+  (thisObject as any).pendingInput = pendingInputSignal;
   return thisObject;
 });
 
@@ -421,12 +434,17 @@ export const SessionContextMock = jest.fn<
     kernelChangedSignal.emit(args);
   });
 
+  session!.pendingInput.connect((_, args) => {
+    (thisObject as any).pendingInput = args;
+  });
+
   (thisObject as any).statusChanged = statusChangedSignal;
   (thisObject as any).kernelChanged = kernelChangedSignal;
   (thisObject as any).iopubMessage = iopubMessageSignal;
   (thisObject as any).propertyChanged = propertyChangedSignal;
   (thisObject as any).disposed = disposedSignal;
   (thisObject as any).session = session;
+  (thisObject as any).pendingInput = false;
 
   return thisObject;
 });