http2: read from connection stream before session create
Version: v9.2.0 Platform: Linux develop 4.9.0-4-amd64 SMP Debian 4.9.51-1 (2017-09-28) x86_64 GNU/Linux Subsystem: http2
Sorry for my english.
I try to do some action (read then unshift data back) with stream on "connection" event but before http2 connectionListener do its job (make session etc) ... and i`m not happy: after read, stream is no longer handled properly by http2 system and i must pack it into ugly Duplex stream.
My question: how to read data from stream, push it back and keep stream usable for http2 module?
Or maybe its a bug - https and http module handle properly such streams.
In this example i do pseudo alpn negotiation but i really need this to support PROXY protocol.
"use strict";
const http2 = require("http2");
const { Duplex } = require("stream");
// ...
const Http2Server = http2.createServer().constructor;
// HTTP2 preface from node-spdy
const PREFACE_BUFFER = Buffer. from ("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
const PREFACE_BUFFER_LENGTH = PREFACE_BUFFER.length;
// Ugly Duplex Proxy
const kWait = Symbol("wait");
const kNread = Symbol("nread");
const kSocket = Symbol("socket");
const kBuffer = Symbol("buffer");
class SocketProxy extends Duplex {
constructor(socket, buffer) {
super({
allowHalfOpen: true,
decodeStrings: false
});
this[kWait] = true;
this[kNread] = -1;
this[kSocket] = socket;
this[kBuffer] = buffer;
socket.on("error", err => this.emit("error", err));
socket.on("data", chunk => this.addChunk(chunk));
socket.once("end", () => {
this[kWait] = false;
this.tryRead();
this.emit("end");
this.destroy();
});
}
_write(data, encoding, cb) {
try {
this[kSocket].write(data, encoding);
cb();
} catch (err) {
cb(err);
}
}
_writev(chunks, cb) {
try {
this[kSocket].writev(chunks);
cb();
} catch (err) {
cb(err);
}
}
_destroy(e, cb) {
try {
this[kSocket].destroy(e);
cb();
} catch (err) {
cb(err);
}
delete this[kSocket];
delete this[kBuffer];
}
_final(cb) {
try {
this[kSocket].final();
cb();
} catch (err) {
cb(err);
}
}
_read(nread) {
if (this[kBuffer].length > 0) {
const data = this[kBuffer].slice(0, nread);
this[kBuffer] = this[kBuffer].slice(nread);
return this.push(data);
} else if (this[kWait]) {
this[kNread] = nread;
} else {
return this.push(null);
}
}
addChunk(chunk) {
this[kBuffer] = Buffer.concat([this[kBuffer], chunk]);
this.tryRead();
}
tryRead() {
const nread = this[kNread];
if (nread !== -1) {
this[kNread] = -1;
this._read(nread);
}
}
get remoteAddress() {
return this[kSocket].remoteAddress;
}
get remotePort() {
return this[kSocket].remotePort;
}
}
// new connection listener - read, unshift or create ugly proxy
function connectionListener(socket) {
const onReadable = () => {
// at this point socket is somehow "broken" for http2
socket.removeListener("readable", onReadable);
let buffer;
let chunk = socket.read();
while (null !== chunk) {
buffer = buffer ? Buffer.concat([buffer, chunk]) : chunk;
let isH2 = true;
const bufferLength = buffer.length;
if (bufferLength >= PREFACE_BUFFER_LENGTH) {
isH2 = PREFACE_BUFFER.equals(buffer.slice(0, PREFACE_BUFFER_LENGTH));
} else {
isH2 = buffer.equals(PREFACE_BUFFER.slice(0, bufferLength));
}
if (!isH2 || bufferLength >= PREFACE_BUFFER_LENGTH) {
if (!isH2) {
// ... pseudo alpn negotiation
Object.defineProperty(socket, "alpnProtocol", {
value: false
});
this.emit("postAlpnConnection", socket);
// httpConnectionListener support readed socekt
socket.unshift(buffer);
} else {
// socket is broken so make proxy...
const proxy = new SocketProxy(socket, buffer);
this.emit("postAlpnConnection", proxy);
}
return;
}
chunk = socket.read();
}
this.emit("postAlpnConnection", socket);
socket.destroy("No data");
};
socket.on("readable", onReadable);
}
// handle session shutdown...
let shutdownWrapper;
function getShutdownWrapper(session) {
if (!shutdownWrapper) {
const origShutdown = session.shutdown;
shutdownWrapper = function wrapper(options, callback) {
// callback === stream.destroy, ignore err object not recognised by JSStreamWrap(?)
origShutdown.call(this, options, callback ? (/*err*/) => callback() : undefined);
};
}
return shutdownWrapper;
}
class Server extends Http2Server {
constructor(options, handler) {
super(options, handler);
this.listeners("connection").forEach(listener => this.on("postAlpnConnection", listener));
this.removeAllListeners("connection");
this.addListener("connection", connectionListener);
this.addListener("session", session => session.shutdown = getShutdownWrapper(session));
}
}
const createServer = (options, handler) => {
if (typeof options === "function") {
handler = options;
options = Object.create(null);
}
return new Server(options, handler);
};
module.exports = {
Server,
createServer
};
PS. This exemple works but probably make memory leaks.