yprovider.ts 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /* -----------------------------------------------------------------------------
  2. | Copyright (c) Jupyter Development Team.
  3. | Distributed under the terms of the Modified BSD License.
  4. |----------------------------------------------------------------------------*/
  5. import * as decoding from 'lib0/decoding';
  6. import * as encoding from 'lib0/encoding';
  7. import { WebsocketProvider } from 'y-websocket';
  8. import * as Y from 'yjs';
  9. import { IDocumentProvider, IDocumentProviderFactory } from './tokens';
  10. import { getAnonymousUserName, getRandomColor } from './awareness';
  11. import * as env from 'lib0/environment';
  12. /**
  13. * A class to provide Yjs synchronization over WebSocket.
  14. *
  15. * The user can specify their own user-name and user-color by adding urlparameters:
  16. * ?username=Alice&usercolor=007007
  17. * wher usercolor must be a six-digit hexadicimal encoded RGB value without the hash token.
  18. *
  19. * We specify custom messages that the server can interpret. For reference please look in yjs_ws_server.
  20. *
  21. */
  22. export class WebSocketProviderWithLocks
  23. extends WebsocketProvider
  24. implements IDocumentProvider {
  25. /**
  26. * Construct a new WebSocketProviderWithLocks
  27. *
  28. * @param options The instantiation options for a WebSocketProviderWithLocks
  29. */
  30. constructor(options: WebSocketProviderWithLocks.IOptions) {
  31. super(
  32. options.url,
  33. options.contentType + ':' + options.path,
  34. options.ymodel.ydoc,
  35. {
  36. awareness: options.ymodel.awareness
  37. }
  38. );
  39. this._path = options.path;
  40. this._contentType = options.contentType;
  41. this._serverUrl = options.url;
  42. const color = '#' + env.getParam('--usercolor', getRandomColor().slice(1));
  43. const name = env.getParam('--username', getAnonymousUserName());
  44. const awareness = options.ymodel.awareness;
  45. const currState = awareness.getLocalState();
  46. // only set if this was not already set by another plugin
  47. if (currState && currState.name == null) {
  48. options.ymodel.awareness.setLocalStateField('user', {
  49. name,
  50. color
  51. });
  52. }
  53. // Message handler that confirms when a lock has been acquired
  54. this.messageHandlers[127] = (
  55. encoder,
  56. decoder,
  57. provider,
  58. emitSynced,
  59. messageType
  60. ) => {
  61. // acquired lock
  62. const timestamp = decoding.readUint32(decoder);
  63. const lockRequest = this._currentLockRequest;
  64. this._currentLockRequest = null;
  65. if (lockRequest) {
  66. lockRequest.resolve(timestamp);
  67. }
  68. };
  69. // Message handler that receives the initial content
  70. this.messageHandlers[125] = (
  71. encoder,
  72. decoder,
  73. provider,
  74. emitSynced,
  75. messageType
  76. ) => {
  77. // received initial content
  78. const initialContent = decoding.readTailAsUint8Array(decoder);
  79. // Apply data from server
  80. if (initialContent.byteLength > 0) {
  81. setTimeout(() => {
  82. Y.applyUpdate(this.doc, initialContent);
  83. }, 0);
  84. }
  85. const initialContentRequest = this._initialContentRequest;
  86. this._initialContentRequest = null;
  87. if (initialContentRequest) {
  88. initialContentRequest.resolve(initialContent.byteLength > 0);
  89. }
  90. };
  91. this._isInitialized = false;
  92. this._onConnectionStatus = this._onConnectionStatus.bind(this);
  93. this.on('status', this._onConnectionStatus);
  94. }
  95. setPath(newPath: string): void {
  96. if (newPath !== this._path) {
  97. this._path = newPath;
  98. // The next time the provider connects, we should connect through a different server url
  99. this.bcChannel =
  100. this._serverUrl + '/' + this._contentType + ':' + this._path;
  101. this.url = this.bcChannel;
  102. const encoder = encoding.createEncoder();
  103. encoding.write(encoder, 123);
  104. // writing a utf8 string to the encoder
  105. const escapedPath = unescape(
  106. encodeURIComponent(this._contentType + ':' + newPath)
  107. );
  108. for (let i = 0; i < escapedPath.length; i++) {
  109. encoding.write(
  110. encoder,
  111. /** @type {number} */ escapedPath.codePointAt(i)!
  112. );
  113. }
  114. this._sendMessage(encoding.toUint8Array(encoder));
  115. }
  116. }
  117. /**
  118. * Resolves to true if the initial content has been initialized on the server. false otherwise.
  119. */
  120. requestInitialContent(): Promise<boolean> {
  121. if (this._initialContentRequest) {
  122. return this._initialContentRequest.promise;
  123. }
  124. let resolve: any, reject: any;
  125. const promise: Promise<boolean> = new Promise((_resolve, _reject) => {
  126. resolve = _resolve;
  127. reject = _reject;
  128. });
  129. this._initialContentRequest = { promise, resolve, reject };
  130. this._sendMessage(new Uint8Array([125]));
  131. // Resolve with true if the server doesn't respond for some reason.
  132. // In case of a connection problem, we don't want the user to re-initialize the window.
  133. // Instead wait for y-websocket to connect to the server.
  134. // @todo maybe we should reload instead..
  135. setTimeout(() => resolve(false), 1000);
  136. return promise;
  137. }
  138. /**
  139. * Put the initialized state.
  140. */
  141. putInitializedState(): void {
  142. const encoder = encoding.createEncoder();
  143. encoding.writeVarUint(encoder, 124);
  144. encoding.writeUint8Array(encoder, Y.encodeStateAsUpdate(this.doc));
  145. this._sendMessage(encoding.toUint8Array(encoder));
  146. this._isInitialized = true;
  147. }
  148. /**
  149. * Acquire a lock.
  150. * Returns a Promise that resolves to the lock number.
  151. */
  152. acquireLock(): Promise<number> {
  153. if (this._currentLockRequest) {
  154. return this._currentLockRequest.promise;
  155. }
  156. this._sendMessage(new Uint8Array([127]));
  157. // try to acquire lock in regular interval
  158. const intervalID = setInterval(() => {
  159. if (this.wsconnected) {
  160. // try to acquire lock
  161. this._sendMessage(new Uint8Array([127]));
  162. }
  163. }, 500);
  164. let resolve: any, reject: any;
  165. const promise: Promise<number> = new Promise((_resolve, _reject) => {
  166. resolve = _resolve;
  167. reject = _reject;
  168. });
  169. this._currentLockRequest = { promise, resolve, reject };
  170. const _finally = () => {
  171. clearInterval(intervalID);
  172. };
  173. promise.then(_finally, _finally);
  174. return promise;
  175. }
  176. /**
  177. * Release a lock.
  178. *
  179. * @param lock The lock to release.
  180. */
  181. releaseLock(lock: number): void {
  182. const encoder = encoding.createEncoder();
  183. // reply with release lock
  184. encoding.writeVarUint(encoder, 126);
  185. encoding.writeUint32(encoder, lock);
  186. // releasing lock
  187. this._sendMessage(encoding.toUint8Array(encoder));
  188. }
  189. /**
  190. * Send a new message to WebSocket server.
  191. *
  192. * @param message The message to send
  193. */
  194. private _sendMessage(message: Uint8Array): void {
  195. // send once connected
  196. const send = () => {
  197. setTimeout(() => {
  198. if (this.wsconnected) {
  199. this.ws!.send(message);
  200. } else {
  201. this.once('status', send);
  202. }
  203. }, 0);
  204. };
  205. send();
  206. }
  207. /**
  208. * Handle a change to the connection status.
  209. *
  210. * @param status The connection status.
  211. */
  212. private async _onConnectionStatus(status: {
  213. status: 'connected' | 'disconnected';
  214. }): Promise<void> {
  215. if (this._isInitialized && status.status === 'connected') {
  216. const lock = await this.acquireLock();
  217. const contentIsInitialized = await this.requestInitialContent();
  218. if (!contentIsInitialized) {
  219. this.putInitializedState();
  220. }
  221. this.releaseLock(lock);
  222. }
  223. }
  224. private _path: string;
  225. private _contentType: string;
  226. private _serverUrl: string;
  227. private _isInitialized: boolean;
  228. private _currentLockRequest: {
  229. promise: Promise<number>;
  230. resolve: (lock: number) => void;
  231. reject: () => void;
  232. } | null = null;
  233. private _initialContentRequest: {
  234. promise: Promise<boolean>;
  235. resolve: (initialized: boolean) => void;
  236. reject: () => void;
  237. } | null = null;
  238. }
  239. /**
  240. * A namespace for WebSocketProviderWithLocks statics.
  241. */
  242. export namespace WebSocketProviderWithLocks {
  243. /**
  244. * The instantiation options for a WebSocketProviderWithLocks.
  245. */
  246. export interface IOptions extends IDocumentProviderFactory.IOptions {
  247. /**
  248. * The server URL
  249. */
  250. url: string;
  251. }
  252. }