Forráskód Böngészése

Message can now be sent on control channel

Johan Mabille 5 éve
szülő
commit
ea00a7b2e8

+ 7 - 7
packages/outputarea/src/widget.ts

@@ -144,7 +144,7 @@ export class OutputArea extends Widget {
   /**
    * The kernel future associated with the output area.
    */
-  get future(): Kernel.IFuture<
+  get future(): Kernel.IShellFuture<
     KernelMessage.IExecuteRequestMsg,
     KernelMessage.IExecuteReplyMsg
   > | null {
@@ -152,7 +152,7 @@ export class OutputArea extends Widget {
   }
 
   set future(
-    value: Kernel.IFuture<
+    value: Kernel.IShellFuture<
       KernelMessage.IExecuteRequestMsg,
       KernelMessage.IExecuteReplyMsg
     > | null
@@ -284,7 +284,7 @@ export class OutputArea extends Widget {
    */
   protected onInputRequest(
     msg: KernelMessage.IInputRequestMsg,
-    future: Kernel.IFuture
+    future: Kernel.IShellFuture
   ): void {
     // Add an output widget to the end.
     let factory = this.contentFactory;
@@ -493,7 +493,7 @@ export class OutputArea extends Widget {
   };
 
   private _minHeightTimeout: number = null;
-  private _future: Kernel.IFuture<
+  private _future: Kernel.IShellFuture<
     KernelMessage.IExecuteRequestMsg,
     KernelMessage.IExecuteReplyMsg
   > | null = null;
@@ -506,7 +506,7 @@ export class SimplifiedOutputArea extends OutputArea {
    */
   protected onInputRequest(
     msg: KernelMessage.IInputRequestMsg,
-    future: Kernel.IFuture
+    future: Kernel.IShellFuture
   ): void {
     return;
   }
@@ -755,7 +755,7 @@ export class Stdin extends Widget implements IStdin {
     this._input.removeEventListener('keydown', this);
   }
 
-  private _future: Kernel.IFuture = null;
+  private _future: Kernel.IShellFuture = null;
   private _input: HTMLInputElement = null;
   private _value: string;
   private _promise = new PromiseDelegate<void>();
@@ -779,7 +779,7 @@ export namespace Stdin {
     /**
      * The kernel future associated with the request.
      */
-    future: Kernel.IFuture;
+    future: Kernel.IShellFuture;
   }
 }
 

+ 3 - 3
packages/services/src/kernel/comm.ts

@@ -103,7 +103,7 @@ export class CommHandler extends DisposableDelegate implements Kernel.IComm {
     data?: JSONObject,
     metadata?: JSONObject,
     buffers: (ArrayBuffer | ArrayBufferView)[] = []
-  ): Kernel.IFuture {
+  ): Kernel.IShellFuture {
     if (this.isDisposed || this._kernel.isDisposed) {
       throw new Error('Cannot open');
     }
@@ -136,7 +136,7 @@ export class CommHandler extends DisposableDelegate implements Kernel.IComm {
     metadata?: JSONObject,
     buffers: (ArrayBuffer | ArrayBufferView)[] = [],
     disposeOnDone: boolean = true
-  ): Kernel.IFuture {
+  ): Kernel.IShellFuture {
     if (this.isDisposed || this._kernel.isDisposed) {
       throw new Error('Cannot send');
     }
@@ -170,7 +170,7 @@ export class CommHandler extends DisposableDelegate implements Kernel.IComm {
     data?: JSONObject,
     metadata?: JSONObject,
     buffers: (ArrayBuffer | ArrayBufferView)[] = []
-  ): Kernel.IFuture {
+  ): Kernel.IShellFuture {
     if (this.isDisposed || this._kernel.isDisposed) {
       throw new Error('Cannot close');
     }

+ 88 - 5
packages/services/src/kernel/default.ts

@@ -19,7 +19,11 @@ import { Kernel } from './kernel';
 
 import { KernelMessage } from './messages';
 
-import { KernelFutureHandler } from './future';
+import {
+  KernelFutureHandler,
+  KernelShellFutureHandler,
+  KernelControlFutureHandler
+} from './future';
 
 import * as serialize from './serialize';
 
@@ -263,7 +267,58 @@ export class DefaultKernel implements Kernel.IKernel {
     msg: KernelMessage.IShellMessage<T>,
     expectReply = false,
     disposeOnDone = true
-  ): Kernel.IFuture<KernelMessage.IShellMessage<T>> {
+  ): Kernel.IShellFuture<KernelMessage.IShellMessage<T>> {
+    return this._sendKernelShellControl(
+      KernelShellFutureHandler,
+      msg,
+      expectReply,
+      disposeOnDone
+    ) as Kernel.IShellFuture<KernelMessage.IShellMessage<T>>;
+  }
+
+  /**
+   * Send a control message to the kernel.
+   *
+   * #### Notes
+   * Send a message to the kernel's control channel, yielding a future object
+   * for accepting replies.
+   *
+   * If `expectReply` is given and `true`, the future is disposed when both a
+   * control reply and an idle status message are received. If `expectReply`
+   * is not given or is `false`, the future is resolved when an idle status
+   * message is received.
+   * If `disposeOnDone` is not given or is `true`, the Future is disposed at this point.
+   * If `disposeOnDone` is given and `false`, it is up to the caller to dispose of the Future.
+   *
+   * All replies are validated as valid kernel messages.
+   *
+   * If the kernel status is `dead`, this will throw an error.
+   */
+  sendControlMessage<T extends KernelMessage.ControlMessageType>(
+    msg: KernelMessage.IControlMessage<T>,
+    expectReply = false,
+    disposeOnDone = true
+  ): Kernel.IControlFuture<KernelMessage.IControlMessage<T>> {
+    return this._sendKernelShellControl(
+      KernelControlFutureHandler,
+      msg,
+      expectReply,
+      disposeOnDone
+    ) as Kernel.IControlFuture<KernelMessage.IControlMessage<T>>;
+  }
+
+  private _sendKernelShellControl<
+    KFH extends new (...params: any[]) => KernelFutureHandler,
+    T extends KernelMessage.IMessage
+  >(
+    ctor: KFH,
+    msg: T,
+    expectReply = false,
+    disposeOnDone = true
+  ): Kernel.IFuture<
+    KernelMessage.IShellControlMessage,
+    KernelMessage.IShellControlMessage
+  > {
     if (this.status === 'dead') {
       throw new Error('Kernel is dead');
     }
@@ -273,7 +328,7 @@ export class DefaultKernel implements Kernel.IKernel {
       this._ws.send(serialize.serialize(msg));
     }
     this._anyMessage.emit({ msg, direction: 'send' });
-    let future = new KernelFutureHandler(
+    let future = new ctor(
       () => {
         let msgId = msg.header.msg_id;
         this._futures.delete(msgId);
@@ -513,7 +568,7 @@ export class DefaultKernel implements Kernel.IKernel {
     content: KernelMessage.IExecuteRequestMsg['content'],
     disposeOnDone: boolean = true,
     metadata?: JSONObject
-  ): Kernel.IFuture<
+  ): Kernel.IShellFuture<
     KernelMessage.IExecuteRequestMsg,
     KernelMessage.IExecuteReplyMsg
   > {
@@ -531,12 +586,40 @@ export class DefaultKernel implements Kernel.IKernel {
       session: this._clientId,
       content: { ...defaults, ...content }
     });
-    return this.sendShellMessage(msg, true, disposeOnDone) as Kernel.IFuture<
+    return this.sendShellMessage(
+      msg,
+      true,
+      disposeOnDone
+    ) as Kernel.IShellFuture<
       KernelMessage.IExecuteRequestMsg,
       KernelMessage.IExecuteReplyMsg
     >;
   }
 
+  requestDebug(
+    content: KernelMessage.IDebugRequest,
+    disposeOnDone: boolean = true
+  ): Kernel.IControlFuture<
+    KernelMessage.IDebugRequestMsg,
+    KernelMessage.IDebugReplyMsg
+  > {
+    let msg = KernelMessage.createMessage({
+      msgType: 'debug_request',
+      channel: 'control',
+      username: this._username,
+      session: this._clientId,
+      content
+    });
+    return this.sendControlMessage(
+      msg,
+      true,
+      disposeOnDone
+    ) as Kernel.IControlFuture<
+      KernelMessage.IDebugRequestMsg,
+      KernelMessage.IDebugReplyMsg
+    >;
+  }
+
   /**
    * Send an `is_complete_request` message.
    *

+ 98 - 62
packages/services/src/kernel/future.ts

@@ -19,10 +19,11 @@ declare var setImmediate: any;
  * is considered done when the `idle` status is received.
  *
  */
-export class KernelFutureHandler<
-  REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
-  REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
+export abstract class KernelFutureHandler<
+  REQUEST extends KernelMessage.IShellControlMessage = KernelMessage.IShellControlMessage,
+  REPLY extends KernelMessage.IShellControlMessage = KernelMessage.IShellControlMessage
 > extends DisposableDelegate implements Kernel.IFuture<REQUEST, REPLY> {
+  abstract async handleMsg(msg: KernelMessage.IMessage): Promise<void>;
   /**
    * Construct a new KernelFutureHandler.
    */
@@ -88,24 +89,6 @@ export class KernelFutureHandler<
     this._iopub = cb;
   }
 
-  /**
-   * Get the stdin handler.
-   */
-  get onStdin(): (
-    msg: KernelMessage.IStdinMessage
-  ) => void | PromiseLike<void> {
-    return this._stdin;
-  }
-
-  /**
-   * Set the stdin handler.
-   */
-  set onStdin(
-    cb: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>
-  ) {
-    this._stdin = cb;
-  }
-
   /**
    * Register hook for IOPub messages.
    *
@@ -151,18 +134,10 @@ export class KernelFutureHandler<
     this._hooks.remove(hook);
   }
 
-  /**
-   * Send an `input_reply` message.
-   */
-  sendInputReply(content: KernelMessage.IInputReplyMsg['content']): void {
-    this._kernel.sendInputReply(content);
-  }
-
   /**
    * Dispose and unregister the future.
    */
   dispose(): void {
-    this._stdin = Private.noOp;
     this._iopub = Private.noOp;
     this._reply = Private.noOp;
     this._hooks = null;
@@ -189,26 +164,7 @@ export class KernelFutureHandler<
     super.dispose();
   }
 
-  /**
-   * Handle an incoming kernel message.
-   */
-  async handleMsg(msg: KernelMessage.IMessage): Promise<void> {
-    switch (msg.channel) {
-      case 'shell':
-        await this._handleReply(msg as REPLY);
-        break;
-      case 'stdin':
-        await this._handleStdin(msg as KernelMessage.IStdinMessage);
-        break;
-      case 'iopub':
-        await this._handleIOPub(msg as KernelMessage.IIOPubMessage);
-        break;
-      default:
-        break;
-    }
-  }
-
-  private async _handleReply(msg: REPLY): Promise<void> {
+  protected async _handleReply(msg: REPLY): Promise<void> {
     let reply = this._reply;
     if (reply) {
       // tslint:disable-next-line:await-promise
@@ -221,15 +177,9 @@ export class KernelFutureHandler<
     }
   }
 
-  private async _handleStdin(msg: KernelMessage.IStdinMessage): Promise<void> {
-    let stdin = this._stdin;
-    if (stdin) {
-      // tslint:disable-next-line:await-promise
-      await stdin(msg);
-    }
-  }
-
-  private async _handleIOPub(msg: KernelMessage.IIOPubMessage): Promise<void> {
+  protected async _handleIOPub(
+    msg: KernelMessage.IIOPubMessage
+  ): Promise<void> {
     let process = await this._hooks.process(msg);
     let iopub = this._iopub;
     if (process && iopub) {
@@ -276,9 +226,6 @@ export class KernelFutureHandler<
 
   private _msg: REQUEST;
   private _status = 0;
-  private _stdin: (
-    msg: KernelMessage.IStdinMessage
-  ) => void | PromiseLike<void> = Private.noOp;
   private _iopub: (
     msg: KernelMessage.IIOPubMessage
   ) => void | PromiseLike<void> = Private.noOp;
@@ -287,7 +234,96 @@ export class KernelFutureHandler<
   private _replyMsg: REPLY;
   private _hooks = new Private.HookList<KernelMessage.IIOPubMessage>();
   private _disposeOnDone = true;
-  private _kernel: Kernel.IKernel;
+  protected _kernel: Kernel.IKernel;
+}
+
+export class KernelControlFutureHandler<
+  REQUEST extends KernelMessage.IControlMessage = KernelMessage.IControlMessage,
+  REPLY extends KernelMessage.IControlMessage = KernelMessage.IControlMessage
+> extends KernelFutureHandler<REQUEST, REPLY>
+  implements Kernel.IControlFuture<REQUEST, REPLY> {
+  async handleMsg(msg: KernelMessage.IMessage): Promise<void> {
+    switch (msg.channel) {
+      case 'control':
+        await this._handleReply(msg as REPLY);
+        break;
+      case 'iopub':
+        await this._handleIOPub(msg as KernelMessage.IIOPubMessage);
+        break;
+      default:
+        break;
+    }
+  }
+}
+
+export class KernelShellFutureHandler<
+  REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
+  REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
+> extends KernelFutureHandler<REQUEST, REPLY>
+  implements Kernel.IShellFuture<REQUEST, REPLY> {
+  /**
+   * Get the stdin handler.
+   */
+  get onStdin(): (
+    msg: KernelMessage.IStdinMessage
+  ) => void | PromiseLike<void> {
+    return this._stdin;
+  }
+
+  /**
+   * Set the stdin handler.
+   */
+  set onStdin(
+    cb: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>
+  ) {
+    this._stdin = cb;
+  }
+
+  /**
+   * Send an `input_reply` message.
+   */
+  sendInputReply(content: KernelMessage.IInputReplyMsg['content']): void {
+    this._kernel.sendInputReply(content);
+  }
+
+  /**
+   * Handle an incoming kernel message.
+   */
+  async handleMsg(msg: KernelMessage.IMessage): Promise<void> {
+    switch (msg.channel) {
+      case 'shell':
+        await this._handleReply(msg as REPLY);
+        break;
+      case 'stdin':
+        await this._handleStdin(msg as KernelMessage.IStdinMessage);
+        break;
+      case 'iopub':
+        await this._handleIOPub(msg as KernelMessage.IIOPubMessage);
+        break;
+      default:
+        break;
+    }
+  }
+
+  /**
+   * Dispose and unregister the future.
+   */
+  dispose(): void {
+    this._stdin = Private.noOp;
+    super.dispose();
+  }
+
+  private async _handleStdin(msg: KernelMessage.IStdinMessage): Promise<void> {
+    let stdin = this._stdin;
+    if (stdin) {
+      // tslint:disable-next-line:await-promise
+      await stdin(msg);
+    }
+  }
+
+  private _stdin: (
+    msg: KernelMessage.IStdinMessage
+  ) => void | PromiseLike<void> = Private.noOp;
 }
 
 namespace Private {

+ 39 - 16
packages/services/src/kernel/kernel.ts

@@ -130,8 +130,13 @@ export namespace Kernel {
       msg: KernelMessage.IShellMessage<T>,
       expectReply?: boolean,
       disposeOnDone?: boolean
-    ): Kernel.IFuture<KernelMessage.IShellMessage<T>>;
+    ): Kernel.IShellFuture<KernelMessage.IShellMessage<T>>;
 
+    sendControlMessage<T extends KernelMessage.ControlMessageType>(
+      msg: KernelMessage.IControlMessage<T>,
+      expectReply?: boolean,
+      disposeOnDone?: boolean
+    ): Kernel.IControlFuture<KernelMessage.IControlMessage<T>>;
     /**
      * Reconnect to a disconnected kernel.
      *
@@ -268,11 +273,19 @@ export namespace Kernel {
       content: KernelMessage.IExecuteRequestMsg['content'],
       disposeOnDone?: boolean,
       metadata?: JSONObject
-    ): Kernel.IFuture<
+    ): Kernel.IShellFuture<
       KernelMessage.IExecuteRequestMsg,
       KernelMessage.IExecuteReplyMsg
     >;
 
+    requestDebug(
+      content: KernelMessage.IDebugRequest,
+      disposeOnDone?: boolean
+    ): Kernel.IControlFuture<
+      KernelMessage.IDebugRequestMsg,
+      KernelMessage.IDebugReplyMsg
+    >;
+
     /**
      * Send an `is_complete_request` message.
      *
@@ -743,8 +756,8 @@ export namespace Kernel {
    * responses that may come from the kernel.
    */
   export interface IFuture<
-    REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
-    REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
+    REQUEST extends KernelMessage.IShellMessage | KernelMessage.IControlMessage,
+    REPLY extends KernelMessage.IShellMessage | KernelMessage.IControlMessage
   > extends IDisposable {
     /**
      * The original outgoing message.
@@ -774,15 +787,6 @@ export namespace Kernel {
      */
     onReply: (msg: REPLY) => void | PromiseLike<void>;
 
-    /**
-     * The stdin handler for the kernel future.
-     *
-     * #### Notes
-     * If the handler returns a promise, all kernel message processing pauses
-     * until the promise is resolved.
-     */
-    onStdin: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>;
-
     /**
      * The iopub handler for the kernel future.
      *
@@ -826,6 +830,20 @@ export namespace Kernel {
     removeMessageHook(
       hook: (msg: KernelMessage.IIOPubMessage) => boolean | PromiseLike<boolean>
     ): void;
+  }
+
+  export interface IShellFuture<
+    REQUEST extends KernelMessage.IShellMessage = KernelMessage.IShellMessage,
+    REPLY extends KernelMessage.IShellMessage = KernelMessage.IShellMessage
+  > extends IFuture<REQUEST, REPLY> {
+    /**
+     * The stdin handler for the kernel future.
+     *
+     * #### Notes
+     * If the handler returns a promise, all kernel message processing pauses
+     * until the promise is resolved.
+     */
+    onStdin: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>;
 
     /**
      * Send an `input_reply` message.
@@ -833,6 +851,11 @@ export namespace Kernel {
     sendInputReply(content: KernelMessage.IInputReplyMsg['content']): void;
   }
 
+  export interface IControlFuture<
+    REQUEST extends KernelMessage.IControlMessage = KernelMessage.IControlMessage,
+    REPLY extends KernelMessage.IControlMessage = KernelMessage.IControlMessage
+  > extends IFuture<REQUEST, REPLY> {}
+
   /**
    * A client side Comm interface.
    */
@@ -883,7 +906,7 @@ export namespace Kernel {
       data?: JSONValue,
       metadata?: JSONObject,
       buffers?: (ArrayBuffer | ArrayBufferView)[]
-    ): IFuture;
+    ): IShellFuture;
 
     /**
      * Send a `comm_msg` message to the kernel.
@@ -906,7 +929,7 @@ export namespace Kernel {
       metadata?: JSONObject,
       buffers?: (ArrayBuffer | ArrayBufferView)[],
       disposeOnDone?: boolean
-    ): IFuture;
+    ): IShellFuture;
 
     /**
      * Close the comm.
@@ -927,7 +950,7 @@ export namespace Kernel {
       data?: JSONValue,
       metadata?: JSONObject,
       buffers?: (ArrayBuffer | ArrayBufferView)[]
-    ): IFuture;
+    ): IShellFuture;
   }
 
   /**

+ 9 - 11
packages/services/src/kernel/messages.ts

@@ -163,9 +163,7 @@ export namespace KernelMessage {
   /**
    * Control message types.
    */
-  export type ControlMessageType = 
-    | 'debug_request'
-    | 'debug_reply';
+  export type ControlMessageType = 'debug_request' | 'debug_reply';
 
   /**
    * IOPub message types.
@@ -289,11 +287,13 @@ export namespace KernelMessage {
   /**
    * A kernel message on the `'control'` channel.
    */
-  export interface IControlMessage<T extends ControlMessageType = ControlMessageType>
-    extends IMessage<T> {
+  export interface IControlMessage<
+    T extends ControlMessageType = ControlMessageType
+  > extends IMessage<T> {
     channel: 'control';
   }
 
+  export type IShellControlMessage = IShellMessage | IControlMessage;
   /**
    * A kernel message on the `'iopub'` channel.
    */
@@ -985,16 +985,14 @@ export namespace KernelMessage {
   /**
    * Debug message types.
    */
-  export type DebugMessageType =
-    | 'request'
-    | 'response'
-    | 'event';
+  export type DebugMessageType = 'request' | 'response' | 'event';
 
   /**
    * Base interface for debug protocol message.
    */
-  export interface IDebugProtocolMsg<T extends DebugMessageType = DebugMessageType>
-  {
+  export interface IDebugProtocolMsg<
+    T extends DebugMessageType = DebugMessageType
+  > {
     seq: number;
     type: T;
   }