Node updated. Some todos.

This commit is contained in:
Norm Rasmussen
2024-09-23 20:52:09 -04:00
parent 8bfaca8375
commit f25622067f
2041 changed files with 124145 additions and 110445 deletions

View File

@ -1,9 +1,15 @@
'use strict';
const BINARY_TYPES = ['nodebuffer', 'arraybuffer', 'fragments'];
const hasBlob = typeof Blob !== 'undefined';
if (hasBlob) BINARY_TYPES.push('blob');
module.exports = {
BINARY_TYPES: ['nodebuffer', 'arraybuffer', 'fragments'],
BINARY_TYPES,
EMPTY_BUFFER: Buffer.alloc(0),
GUID: '258EAFA5-E914-47DA-95CA-C5AB0DC85B11',
hasBlob,
kForOnEventAttribute: Symbol('kIsForOnEventAttribute'),
kListener: Symbol('kListener'),
kStatusCode: Symbol('status-code'),

View File

@ -13,13 +13,6 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util');
const { isValidStatusCode, isValidUTF8 } = require('./validation');
const FastBuffer = Buffer[Symbol.species];
const promise = Promise.resolve();
//
// `queueMicrotask()` is not available in Node.js < 11.
//
const queueTask =
typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;
const GET_INFO = 0;
const GET_PAYLOAD_LENGTH_16 = 1;
@ -39,7 +32,7 @@ class Receiver extends Writable {
* Creates a Receiver instance.
*
* @param {Object} [options] Options object
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether
* @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
* any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
* multiple times in the same tick
* @param {String} [options.binaryType=nodebuffer] The type for binary data
@ -54,7 +47,10 @@ class Receiver extends Writable {
constructor(options = {}) {
super();
this._allowSynchronousEvents = !!options.allowSynchronousEvents;
this._allowSynchronousEvents =
options.allowSynchronousEvents !== undefined
? options.allowSynchronousEvents
: true;
this._binaryType = options.binaryType || BINARY_TYPES[0];
this._extensions = options.extensions || {};
this._isServer = !!options.isServer;
@ -563,21 +559,18 @@ class Receiver extends Writable {
data = concat(fragments, messageLength);
} else if (this._binaryType === 'arraybuffer') {
data = toArrayBuffer(concat(fragments, messageLength));
} else if (this._binaryType === 'blob') {
data = new Blob(fragments);
} else {
data = fragments;
}
//
// If the state is `INFLATING`, it means that the frame data was
// decompressed asynchronously, so there is no need to defer the event
// as it will be emitted asynchronously anyway.
//
if (this._state === INFLATING || this._allowSynchronousEvents) {
if (this._allowSynchronousEvents) {
this.emit('message', data, true);
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', data, true);
this._state = GET_INFO;
this.startLoop(cb);
@ -604,7 +597,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', buf, false);
this._state = GET_INFO;
this.startLoop(cb);
@ -675,7 +668,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
this._state = GET_INFO;
this.startLoop(cb);
@ -711,32 +704,3 @@ class Receiver extends Writable {
}
module.exports = Receiver;
/**
* A shim for `queueMicrotask()`.
*
* @param {Function} cb Callback
*/
function queueMicrotaskShim(cb) {
promise.then(cb).catch(throwErrorNextTick);
}
/**
* Throws an error.
*
* @param {Error} err The error to throw
* @private
*/
function throwError(err) {
throw err;
}
/**
* Throws an error in the next tick.
*
* @param {Error} err The error to throw
* @private
*/
function throwErrorNextTick(err) {
process.nextTick(throwError, err);
}

217
Scripts/node_modules/ws/lib/sender.js generated vendored
View File

@ -6,12 +6,19 @@ const { Duplex } = require('stream');
const { randomFillSync } = require('crypto');
const PerMessageDeflate = require('./permessage-deflate');
const { EMPTY_BUFFER } = require('./constants');
const { isValidStatusCode } = require('./validation');
const { EMPTY_BUFFER, kWebSocket, NOOP } = require('./constants');
const { isBlob, isValidStatusCode } = require('./validation');
const { mask: applyMask, toBuffer } = require('./buffer-util');
const kByteLength = Symbol('kByteLength');
const maskBuffer = Buffer.alloc(4);
const RANDOM_POOL_SIZE = 8 * 1024;
let randomPool;
let randomPoolPointer = RANDOM_POOL_SIZE;
const DEFAULT = 0;
const DEFLATING = 1;
const GET_BLOB_DATA = 2;
/**
* HyBi Sender implementation.
@ -39,8 +46,10 @@ class Sender {
this._compress = false;
this._bufferedBytes = 0;
this._deflating = false;
this._queue = [];
this._state = DEFAULT;
this.onerror = NOOP;
this[kWebSocket] = undefined;
}
/**
@ -76,7 +85,24 @@ class Sender {
if (options.generateMask) {
options.generateMask(mask);
} else {
randomFillSync(mask, 0, 4);
if (randomPoolPointer === RANDOM_POOL_SIZE) {
/* istanbul ignore else */
if (randomPool === undefined) {
//
// This is lazily initialized because server-sent frames must not
// be masked so it may never be used.
//
randomPool = Buffer.alloc(RANDOM_POOL_SIZE);
}
randomFillSync(randomPool, 0, RANDOM_POOL_SIZE);
randomPoolPointer = 0;
}
mask[0] = randomPool[randomPoolPointer++];
mask[1] = randomPool[randomPoolPointer++];
mask[2] = randomPool[randomPoolPointer++];
mask[3] = randomPool[randomPoolPointer++];
}
skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0;
@ -190,7 +216,7 @@ class Sender {
rsv1: false
};
if (this._deflating) {
if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, buf, false, options, cb]);
} else {
this.sendFrame(Sender.frame(buf, options), cb);
@ -212,6 +238,9 @@ class Sender {
if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else if (isBlob(data)) {
byteLength = data.size;
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
@ -233,7 +262,13 @@ class Sender {
rsv1: false
};
if (this._deflating) {
if (isBlob(data)) {
if (this._state !== DEFAULT) {
this.enqueue([this.getBlobData, data, false, options, cb]);
} else {
this.getBlobData(data, false, options, cb);
}
} else if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, data, false, options, cb]);
} else {
this.sendFrame(Sender.frame(data, options), cb);
@ -255,6 +290,9 @@ class Sender {
if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else if (isBlob(data)) {
byteLength = data.size;
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
@ -276,7 +314,13 @@ class Sender {
rsv1: false
};
if (this._deflating) {
if (isBlob(data)) {
if (this._state !== DEFAULT) {
this.enqueue([this.getBlobData, data, false, options, cb]);
} else {
this.getBlobData(data, false, options, cb);
}
} else if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, data, false, options, cb]);
} else {
this.sendFrame(Sender.frame(data, options), cb);
@ -310,6 +354,9 @@ class Sender {
if (typeof data === 'string') {
byteLength = Buffer.byteLength(data);
readOnly = false;
} else if (isBlob(data)) {
byteLength = data.size;
readOnly = false;
} else {
data = toBuffer(data);
byteLength = data.length;
@ -337,40 +384,94 @@ class Sender {
if (options.fin) this._firstFragment = true;
if (perMessageDeflate) {
const opts = {
[kByteLength]: byteLength,
fin: options.fin,
generateMask: this._generateMask,
mask: options.mask,
maskBuffer: this._maskBuffer,
opcode,
readOnly,
rsv1
};
const opts = {
[kByteLength]: byteLength,
fin: options.fin,
generateMask: this._generateMask,
mask: options.mask,
maskBuffer: this._maskBuffer,
opcode,
readOnly,
rsv1
};
if (this._deflating) {
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
if (isBlob(data)) {
if (this._state !== DEFAULT) {
this.enqueue([this.getBlobData, data, this._compress, opts, cb]);
} else {
this.dispatch(data, this._compress, opts, cb);
this.getBlobData(data, this._compress, opts, cb);
}
} else if (this._state !== DEFAULT) {
this.enqueue([this.dispatch, data, this._compress, opts, cb]);
} else {
this.sendFrame(
Sender.frame(data, {
[kByteLength]: byteLength,
fin: options.fin,
generateMask: this._generateMask,
mask: options.mask,
maskBuffer: this._maskBuffer,
opcode,
readOnly,
rsv1: false
}),
cb
);
this.dispatch(data, this._compress, opts, cb);
}
}
/**
* Gets the contents of a blob as binary data.
*
* @param {Blob} blob The blob
* @param {Boolean} [compress=false] Specifies whether or not to compress
* the data
* @param {Object} options Options object
* @param {Boolean} [options.fin=false] Specifies whether or not to set the
* FIN bit
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Boolean} [options.mask=false] Specifies whether or not to mask
* `data`
* @param {Buffer} [options.maskBuffer] The buffer used to store the masking
* key
* @param {Number} options.opcode The opcode
* @param {Boolean} [options.readOnly=false] Specifies whether `data` can be
* modified
* @param {Boolean} [options.rsv1=false] Specifies whether or not to set the
* RSV1 bit
* @param {Function} [cb] Callback
* @private
*/
getBlobData(blob, compress, options, cb) {
this._bufferedBytes += options[kByteLength];
this._state = GET_BLOB_DATA;
blob
.arrayBuffer()
.then((arrayBuffer) => {
if (this._socket.destroyed) {
const err = new Error(
'The socket was closed while the blob was being read'
);
//
// `callCallbacks` is called in the next tick to ensure that errors
// that might be thrown in the callbacks behave like errors thrown
// outside the promise chain.
//
process.nextTick(callCallbacks, this, err, cb);
return;
}
this._bufferedBytes -= options[kByteLength];
const data = toBuffer(arrayBuffer);
if (!compress) {
this._state = DEFAULT;
this.sendFrame(Sender.frame(data, options), cb);
this.dequeue();
} else {
this.dispatch(data, compress, options, cb);
}
})
.catch((err) => {
//
// `onError` is called in the next tick for the same reason that
// `callCallbacks` above is.
//
process.nextTick(onError, this, err, cb);
});
}
/**
* Dispatches a message.
*
@ -403,27 +504,19 @@ class Sender {
const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
this._bufferedBytes += options[kByteLength];
this._deflating = true;
this._state = DEFLATING;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
if (this._socket.destroyed) {
const err = new Error(
'The socket was closed while data was being compressed'
);
if (typeof cb === 'function') cb(err);
for (let i = 0; i < this._queue.length; i++) {
const params = this._queue[i];
const callback = params[params.length - 1];
if (typeof callback === 'function') callback(err);
}
callCallbacks(this, err, cb);
return;
}
this._bufferedBytes -= options[kByteLength];
this._deflating = false;
this._state = DEFAULT;
options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
this.dequeue();
@ -436,7 +529,7 @@ class Sender {
* @private
*/
dequeue() {
while (!this._deflating && this._queue.length) {
while (this._state === DEFAULT && this._queue.length) {
const params = this._queue.shift();
this._bufferedBytes -= params[3][kByteLength];
@ -475,3 +568,35 @@ class Sender {
}
module.exports = Sender;
/**
* Calls queued callbacks with an error.
*
* @param {Sender} sender The `Sender` instance
* @param {Error} err The error to call the callbacks with
* @param {Function} [cb] The first callback
* @private
*/
function callCallbacks(sender, err, cb) {
if (typeof cb === 'function') cb(err);
for (let i = 0; i < sender._queue.length; i++) {
const params = sender._queue[i];
const callback = params[params.length - 1];
if (typeof callback === 'function') callback(err);
}
}
/**
* Handles a `Sender` error.
*
* @param {Sender} sender The `Sender` instance
* @param {Error} err The error
* @param {Function} [cb] The first pending callback
* @private
*/
function onError(sender, err, cb) {
callCallbacks(sender, err, cb);
sender.onerror(err);
}

View File

@ -2,6 +2,8 @@
const { isUtf8 } = require('buffer');
const { hasBlob } = require('./constants');
//
// Allowed token characters:
//
@ -107,7 +109,27 @@ function _isValidUTF8(buf) {
return true;
}
/**
* Determines whether a value is a `Blob`.
*
* @param {*} value The value to be tested
* @return {Boolean} `true` if `value` is a `Blob`, else `false`
* @private
*/
function isBlob(value) {
return (
hasBlob &&
typeof value === 'object' &&
typeof value.arrayBuffer === 'function' &&
typeof value.type === 'string' &&
typeof value.stream === 'function' &&
(value[Symbol.toStringTag] === 'Blob' ||
value[Symbol.toStringTag] === 'File')
);
}
module.exports = {
isBlob,
isValidStatusCode,
isValidUTF8: _isValidUTF8,
tokenChars

View File

@ -1,4 +1,4 @@
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex$" }] */
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex$", "caughtErrors": "none" }] */
'use strict';
@ -29,7 +29,7 @@ class WebSocketServer extends EventEmitter {
* Create a `WebSocketServer` instance.
*
* @param {Object} options Configuration options
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether
* @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
* any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
* multiple times in the same tick
* @param {Boolean} [options.autoPong=true] Specifies whether or not to
@ -60,7 +60,7 @@ class WebSocketServer extends EventEmitter {
super();
options = {
allowSynchronousEvents: false,
allowSynchronousEvents: true,
autoPong: true,
maxPayload: 100 * 1024 * 1024,
skipUTF8Validation: false,
@ -235,6 +235,7 @@ class WebSocketServer extends EventEmitter {
socket.on('error', socketOnError);
const key = req.headers['sec-websocket-key'];
const upgrade = req.headers.upgrade;
const version = +req.headers['sec-websocket-version'];
if (req.method !== 'GET') {
@ -243,13 +244,13 @@ class WebSocketServer extends EventEmitter {
return;
}
if (req.headers.upgrade.toLowerCase() !== 'websocket') {
if (upgrade === undefined || upgrade.toLowerCase() !== 'websocket') {
const message = 'Invalid Upgrade header';
abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
return;
}
if (!key || !keyRegex.test(key)) {
if (key === undefined || !keyRegex.test(key)) {
const message = 'Missing or invalid Sec-WebSocket-Key header';
abortHandshakeOrEmitwsClientError(this, req, socket, 400, message);
return;

View File

@ -1,4 +1,4 @@
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex|Readable$" }] */
/* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Duplex|Readable$", "caughtErrors": "none" }] */
'use strict';
@ -14,6 +14,8 @@ const { URL } = require('url');
const PerMessageDeflate = require('./permessage-deflate');
const Receiver = require('./receiver');
const Sender = require('./sender');
const { isBlob } = require('./validation');
const {
BINARY_TYPES,
EMPTY_BUFFER,
@ -58,6 +60,7 @@ class WebSocket extends EventEmitter {
this._closeFrameSent = false;
this._closeMessage = EMPTY_BUFFER;
this._closeTimer = null;
this._errorEmitted = false;
this._extensions = {};
this._paused = false;
this._protocol = '';
@ -90,9 +93,8 @@ class WebSocket extends EventEmitter {
}
/**
* This deviates from the WHATWG interface since ws doesn't support the
* required default "blob" type (instead we define a custom "nodebuffer"
* type).
* For historical reasons, the custom "nodebuffer" type is used by the default
* instead of "blob".
*
* @type {String}
*/
@ -213,11 +215,14 @@ class WebSocket extends EventEmitter {
skipUTF8Validation: options.skipUTF8Validation
});
this._sender = new Sender(socket, this._extensions, options.generateMask);
const sender = new Sender(socket, this._extensions, options.generateMask);
this._receiver = receiver;
this._sender = sender;
this._socket = socket;
receiver[kWebSocket] = this;
sender[kWebSocket] = this;
socket[kWebSocket] = this;
receiver.on('conclude', receiverOnConclude);
@ -227,6 +232,8 @@ class WebSocket extends EventEmitter {
receiver.on('ping', receiverOnPing);
receiver.on('pong', receiverOnPong);
sender.onerror = senderOnError;
//
// These methods may not be available if `socket` is just a `Duplex`.
//
@ -322,13 +329,7 @@ class WebSocket extends EventEmitter {
}
});
//
// Specify a timeout for the closing handshake to complete.
//
this._closeTimer = setTimeout(
this._socket.destroy.bind(this._socket),
closeTimeout
);
setCloseTimer(this);
}
/**
@ -623,7 +624,7 @@ module.exports = WebSocket;
* @param {(String|URL)} address The URL to which to connect
* @param {Array} protocols The subprotocols
* @param {Object} [options] Connection options
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether any
* @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether any
* of the `'message'`, `'ping'`, and `'pong'` events can be emitted multiple
* times in the same tick
* @param {Boolean} [options.autoPong=true] Specifies whether or not to
@ -652,7 +653,7 @@ module.exports = WebSocket;
*/
function initAsClient(websocket, address, protocols, options) {
const opts = {
allowSynchronousEvents: false,
allowSynchronousEvents: true,
autoPong: true,
protocolVersion: protocolVersions[1],
maxPayload: 100 * 1024 * 1024,
@ -661,7 +662,6 @@ function initAsClient(websocket, address, protocols, options) {
followRedirects: false,
maxRedirects: 10,
...options,
createConnection: undefined,
socketPath: undefined,
hostname: undefined,
protocol: undefined,
@ -732,7 +732,8 @@ function initAsClient(websocket, address, protocols, options) {
const protocolSet = new Set();
let perMessageDeflate;
opts.createConnection = isSecure ? tlsConnect : netConnect;
opts.createConnection =
opts.createConnection || (isSecure ? tlsConnect : netConnect);
opts.defaultPort = opts.defaultPort || defaultPort;
opts.port = parsedUrl.port || defaultPort;
opts.host = parsedUrl.hostname.startsWith('[')
@ -928,7 +929,9 @@ function initAsClient(websocket, address, protocols, options) {
req = websocket._req = null;
if (res.headers.upgrade.toLowerCase() !== 'websocket') {
const upgrade = res.headers.upgrade;
if (upgrade === undefined || upgrade.toLowerCase() !== 'websocket') {
abortHandshake(websocket, socket, 'Invalid Upgrade header');
return;
}
@ -1030,6 +1033,11 @@ function initAsClient(websocket, address, protocols, options) {
*/
function emitErrorAndClose(websocket, err) {
websocket._readyState = WebSocket.CLOSING;
//
// The following assignment is practically useless and is done only for
// consistency.
//
websocket._errorEmitted = true;
websocket.emit('error', err);
websocket.emitClose();
}
@ -1110,7 +1118,7 @@ function abortHandshake(websocket, stream, message) {
*/
function sendAfterClose(websocket, data, cb) {
if (data) {
const length = toBuffer(data).length;
const length = isBlob(data) ? data.size : toBuffer(data).length;
//
// The `_bufferedAmount` property is used only when the peer is a client and
@ -1186,7 +1194,10 @@ function receiverOnError(err) {
websocket.close(err[kStatusCode]);
}
websocket.emit('error', err);
if (!websocket._errorEmitted) {
websocket._errorEmitted = true;
websocket.emit('error', err);
}
}
/**
@ -1242,6 +1253,47 @@ function resume(stream) {
stream.resume();
}
/**
* The `Sender` error event handler.
*
* @param {Error} The error
* @private
*/
function senderOnError(err) {
const websocket = this[kWebSocket];
if (websocket.readyState === WebSocket.CLOSED) return;
if (websocket.readyState === WebSocket.OPEN) {
websocket._readyState = WebSocket.CLOSING;
setCloseTimer(websocket);
}
//
// `socket.end()` is used instead of `socket.destroy()` to allow the other
// peer to finish sending queued data. There is no need to set a timer here
// because `CLOSING` means that it is already set or not needed.
//
this._socket.end();
if (!websocket._errorEmitted) {
websocket._errorEmitted = true;
websocket.emit('error', err);
}
}
/**
* Set a timer to destroy the underlying raw socket of a WebSocket.
*
* @param {WebSocket} websocket The WebSocket instance
* @private
*/
function setCloseTimer(websocket) {
websocket._closeTimer = setTimeout(
websocket._socket.destroy.bind(websocket._socket),
closeTimeout
);
}
/**
* The listener of the socket `'close'` event.
*