Skip to content

Commit 88bd6d3

Browse files
sergiocampamaLukasa
authored andcommitted
Set up HTTP2Parser in handlerAdded if the channel had been previously activated. (#36)
By setting up the handler in the handlerAdded hook, HTTP2Parser supports being added dynamically in a channelRead event, after the channel has sent the channelActive event.
1 parent 9bd5c86 commit 88bd6d3

File tree

4 files changed

+84
-10
lines changed

4 files changed

+84
-10
lines changed

Sources/NIOHTTP2/HTTP2Parser.swift

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -185,17 +185,19 @@ public final class HTTP2Parser: ChannelInboundHandler, ChannelOutboundHandler {
185185
}
186186

187187
public func channelActive(ctx: ChannelHandlerContext) {
188-
self.session = NGHTTP2Session(mode: self.mode,
189-
allocator: ctx.channel.allocator,
190-
maxCachedStreamIDs: self.maxCachedClosedStreams,
191-
frameReceivedHandler: { ctx.fireChannelRead(self.wrapInboundOut($0)) },
192-
sendFunction: { ctx.write(self.wrapOutboundOut($0), promise: $1) },
193-
userEventFunction: { ctx.fireUserInboundEventTriggered($0) })
194-
195-
self.flushPreamble(ctx: ctx)
188+
self.initializeSession(ctx: ctx)
196189
ctx.fireChannelActive()
197190
}
198191

192+
public func handlerAdded(ctx: ChannelHandlerContext) {
193+
// Check if the channel is already active when this handler is being added,
194+
// in case the handler was added dynamically into the pipeline. If so,
195+
// call channelActive to set up the handler.
196+
if ctx.channel.isActive {
197+
self.initializeSession(ctx: ctx)
198+
}
199+
}
200+
199201
public func handlerRemoved(ctx: ChannelHandlerContext) {
200202
self.session = nil
201203
}
@@ -235,6 +237,19 @@ public final class HTTP2Parser: ChannelInboundHandler, ChannelOutboundHandler {
235237
self.reentrancyManager.markFlushPoint()
236238
self.reentrancyManager.process(ctx: ctx, self.process)
237239
}
240+
241+
private func initializeSession(ctx: ChannelHandlerContext) {
242+
if self.session == nil {
243+
self.session = NGHTTP2Session(mode: self.mode,
244+
allocator: ctx.channel.allocator,
245+
maxCachedStreamIDs: self.maxCachedClosedStreams,
246+
frameReceivedHandler: { ctx.fireChannelRead(self.wrapInboundOut($0)) },
247+
sendFunction: { ctx.write(self.wrapOutboundOut($0), promise: $1) },
248+
userEventFunction: { ctx.fireUserInboundEventTriggered($0) })
249+
250+
self.flushPreamble(ctx: ctx)
251+
}
252+
}
238253

239254
private func flushPreamble(ctx: ChannelHandlerContext) {
240255
let frame = HTTP2Frame(streamID: .rootStream, payload: .settings(self.initialSettings))

Tests/NIOHTTP2Tests/SimpleClientServerTests+XCTest.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ extension SimpleClientServerTests {
2727
static var allTests : [(String, (SimpleClientServerTests) -> () throws -> Void)] {
2828
return [
2929
("testBasicRequestResponse", testBasicRequestResponse),
30+
("testBasicRequestResponseWithDynamicPipeline", testBasicRequestResponseWithDynamicPipeline),
3031
("testManyRequestsAtOnce", testManyRequestsAtOnce),
3132
("testNothingButGoaway", testNothingButGoaway),
3233
("testGoAwayWithStreamsUpQuiescing", testGoAwayWithStreamsUpQuiescing),

Tests/NIOHTTP2Tests/SimpleClientServerTests.swift

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,26 @@ final class ClosedEventVsFrameOrderingHandler: ChannelInboundHandler {
169169
}
170170
}
171171

172+
/// A simple channel handler that adds the HTTP2Parser handler dynamically
173+
/// after a read event has been triggered.
174+
class HTTP2ParserProxyHandler: ChannelInboundHandler {
175+
typealias InboundIn = ByteBuffer
176+
177+
private let maxCachedClosedStreams: Int
178+
179+
init(maxCachedClosedStreams: Int) {
180+
self.maxCachedClosedStreams = maxCachedClosedStreams
181+
}
182+
183+
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
184+
XCTAssertNoThrow(try ctx.pipeline.add(
185+
handler: HTTP2Parser(mode: .server, maxCachedClosedStreams: maxCachedClosedStreams
186+
)).wait())
187+
ctx.fireChannelRead(data)
188+
_ = ctx.pipeline.remove(ctx: ctx)
189+
}
190+
}
191+
172192
class SimpleClientServerTests: XCTestCase {
173193
var clientChannel: EmbeddedChannel!
174194
var serverChannel: EmbeddedChannel!
@@ -190,6 +210,13 @@ class SimpleClientServerTests: XCTestCase {
190210
try self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)
191211
}
192212

213+
/// Establish a basic HTTP/2 connection where the HTTP2Parser handler is added after the channel has been activated.
214+
func basicHTTP2DynamicPipelineConnection(maxCachedClosedStreams: Int = 1024) throws {
215+
XCTAssertNoThrow(try self.clientChannel.pipeline.add(handler: HTTP2Parser(mode: .client, maxCachedClosedStreams: maxCachedClosedStreams)).wait())
216+
XCTAssertNoThrow(try self.serverChannel.pipeline.add(handler: HTTP2ParserProxyHandler(maxCachedClosedStreams: maxCachedClosedStreams)).wait())
217+
try self.assertDoHandshake(client: self.clientChannel, server: self.serverChannel)
218+
}
219+
193220
func testBasicRequestResponse() throws {
194221
// Begin by getting the connection up.
195222
try self.basicHTTP2Connection()
@@ -216,6 +243,32 @@ class SimpleClientServerTests: XCTestCase {
216243
XCTAssertNoThrow(try self.serverChannel.finish())
217244
}
218245

246+
func testBasicRequestResponseWithDynamicPipeline() throws {
247+
// Begin by getting the connection up.
248+
try self.basicHTTP2DynamicPipelineConnection()
249+
250+
// We're now going to try to send a request from the client to the server.
251+
let headers = HTTPHeaders([(":path", "/"), (":method", "POST"), (":scheme", "https"), (":authority", "localhost")])
252+
var requestBody = self.clientChannel.allocator.buffer(capacity: 128)
253+
requestBody.write(staticString: "A simple HTTP/2 request.")
254+
255+
let clientStreamID = HTTP2StreamID()
256+
let reqFrame = HTTP2Frame(streamID: clientStreamID, payload: .headers(headers))
257+
var reqBodyFrame = HTTP2Frame(streamID: clientStreamID, payload: .data(.byteBuffer(requestBody)))
258+
reqBodyFrame.flags.insert(.endStream)
259+
260+
let serverStreamID = try self.assertFramesRoundTrip(frames: [reqFrame, reqBodyFrame], sender: self.clientChannel, receiver: self.serverChannel).first!.streamID
261+
262+
// Let's send a quick response back.
263+
let responseHeaders = HTTPHeaders([(":status", "200"), ("content-length", "0")])
264+
var respFrame = HTTP2Frame(streamID: serverStreamID, payload: .headers(responseHeaders))
265+
respFrame.flags.insert(.endStream)
266+
try self.assertFramesRoundTrip(frames: [respFrame], sender: self.serverChannel, receiver: self.clientChannel)
267+
268+
XCTAssertNoThrow(try self.clientChannel.finish())
269+
XCTAssertNoThrow(try self.serverChannel.finish())
270+
}
271+
219272
func testManyRequestsAtOnce() throws {
220273
// Begin by getting the connection up.
221274
try self.basicHTTP2Connection()

Tests/NIOHTTP2Tests/TestUtilities.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,13 @@ extension XCTestCase {
7979
func assertDoHandshake(client: EmbeddedChannel, server: EmbeddedChannel,
8080
clientSettings: [HTTP2Setting] = nioDefaultSettings, serverSettings: [HTTP2Setting] = nioDefaultSettings,
8181
file: StaticString = #file, line: UInt = #line) throws {
82-
client.pipeline.fireChannelActive()
83-
server.pipeline.fireChannelActive()
82+
// This connects are not semantically right, but are required in order to activate the
83+
// channels.
84+
//! FIXME: Replace with registerAlreadyConfigured0 once EmbeddedChannel propagates this
85+
// call to its channelcore.
86+
let socket = try SocketAddress(unixDomainSocketPath: "/fake")
87+
_ = try client.connect(to: socket).wait()
88+
_ = try server.connect(to: socket).wait()
8489

8590
// First the channels need to interact.
8691
self.interactInMemory(client, server, file: file, line: line)

0 commit comments

Comments
 (0)