Skip to content

Commit 6693a60

Browse files
authored
Propagate ChannelShouldQuiesceEvent to child channels (#464)
Motivation: NIO has a 'ChannelShouldQuiesceEvent' which channels can listen for in order to know when they should quiesce. This is typically used to initiate a graceful shutdown of an HTTP/2 server. However, child channels aren't notified of this event so HTTP/2 servers must keep track of streams separately in order to notify them when the server is quiescing. Modifications: - Propagate the `ChannelShouldQuiesceEvent` to child channels Result: Child channels can watch for `ChannelShouldQuiesceEvent`s
1 parent 8e667f8 commit 6693a60

8 files changed

+149
-1
lines changed

Sources/NIOHTTP2/HTTP2ChannelHandler+InboundStreamMultiplexer.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ extension NIOHTTP2Handler.InboundStreamMultiplexer {
119119
}
120120
}
121121

122+
func userInboundEventReceived(_ event: Any) {
123+
switch self {
124+
case .inline(let multiplexer):
125+
multiplexer.receivedUserInboundEvent(event)
126+
case .legacy:
127+
() // No-op: already sent down the pipeline by the `NIOHTTP2Handler`.
128+
}
129+
}
130+
122131
func channelWritabilityChangedReceived() {
123132
switch self {
124133
case .inline(let inlineStreamMultiplexer):

Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ extension InlineStreamMultiplexer: HTTP2InboundStreamMultiplexer {
4646
self._commonStreamMultiplexer.receivedFrame(frame, context: self.context, multiplexer: .inline(self))
4747
}
4848

49+
func receivedUserInboundEvent(_ event: Any) {
50+
self._commonStreamMultiplexer.selectivelyPropagateUserInboundEvent(context: self.context, event: event)
51+
}
52+
4953
func streamError(streamID: HTTP2StreamID, error: Error) {
5054
let streamError = NIOHTTP2Errors.streamError(streamID: streamID, baseError: error)
5155
self._commonStreamMultiplexer.streamError(context: self.context, streamError)

Sources/NIOHTTP2/HTTP2ChannelHandler.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,11 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
445445
}
446446
}
447447

448+
public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
449+
self.inboundStreamMultiplexer?.userInboundEventReceived(event)
450+
context.fireUserInboundEventTriggered(event)
451+
}
452+
448453
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
449454
let data = self.unwrapInboundIn(data)
450455
self.frameDecoder.append(bytes: data)

Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,24 @@ extension HTTP2CommonInboundStreamMultiplexer {
209209
self.streamChannelContinuation?.finish()
210210
}
211211

212+
internal func selectivelyPropagateUserInboundEvent(context: ChannelHandlerContext, event: Any) {
213+
func propagateEvent(_ event: Any) {
214+
for channel in self.streams.values {
215+
channel.baseChannel.pipeline.fireUserInboundEventTriggered(event)
216+
}
217+
for channel in self._pendingStreams.values {
218+
channel.baseChannel.pipeline.fireUserInboundEventTriggered(event)
219+
}
220+
}
221+
222+
switch event {
223+
case is ChannelShouldQuiesceEvent:
224+
propagateEvent(event)
225+
default:
226+
()
227+
}
228+
}
229+
212230
internal func propagateChannelWritabilityChanged(context: ChannelHandlerContext) {
213231
for channel in self.streams.values {
214232
channel.parentChannelWritabilityChanged(newValue: context.channel.isWritable)

Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
102102
case let evt as NIOHTTP2StreamCreatedEvent:
103103
_ = self.commonStreamMultiplexer.streamCreated(event: evt)
104104
default:
105-
break
105+
self.commonStreamMultiplexer.selectivelyPropagateUserInboundEvent(context: context, event: event)
106106
}
107107

108108
context.fireUserInboundEventTriggered(event)

Tests/NIOHTTP2Tests/HTTP2FramePayloadStreamMultiplexerTests.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2322,3 +2322,22 @@ private final class ReadAndFrameConsumer: ChannelInboundHandler, ChannelOutbound
23222322
}
23232323
}
23242324
}
2325+
2326+
final class UserInboundEventRecorder: ChannelInboundHandler {
2327+
typealias InboundIn = Any
2328+
2329+
private let receivedEvents: NIOLockedValueBox<[Any]>
2330+
2331+
var events: [Any] {
2332+
self.receivedEvents.withLockedValue { $0 }
2333+
}
2334+
2335+
init() {
2336+
self.receivedEvents = NIOLockedValueBox([])
2337+
}
2338+
2339+
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
2340+
self.receivedEvents.withLockedValue { $0.append(event) }
2341+
context.fireUserInboundEventTriggered(event)
2342+
}
2343+
}

Tests/NIOHTTP2Tests/SimpleClientServerFramePayloadStreamTests.swift

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2185,4 +2185,62 @@ class SimpleClientServerFramePayloadStreamTests: XCTestCase {
21852185
XCTAssertNoThrow(try self.clientChannel.finish())
21862186
XCTAssertNoThrow(try self.serverChannel.finish())
21872187
}
2188+
2189+
func testChannelShouldQuiesceIsPropagated() throws {
2190+
// Setup the connection.
2191+
let receivedShouldQuiesceEvent = self.clientChannel.eventLoop.makePromise(of: Void.self)
2192+
try self.basicHTTP2Connection { stream in
2193+
stream.pipeline.addHandler(ShouldQuiesceEventWaiter(promise: receivedShouldQuiesceEvent))
2194+
}
2195+
2196+
let connectionReceivedShouldQuiesceEvent = self.clientChannel.eventLoop.makePromise(of: Void.self)
2197+
try self.serverChannel.pipeline.addHandler(ShouldQuiesceEventWaiter(promise: connectionReceivedShouldQuiesceEvent)).wait()
2198+
2199+
// Create the stream channel.
2200+
let multiplexer = try self.clientChannel.pipeline.handler(type: HTTP2StreamMultiplexer.self).wait()
2201+
let streamPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
2202+
multiplexer.createStreamChannel(promise: streamPromise) {
2203+
$0.eventLoop.makeSucceededVoidFuture()
2204+
}
2205+
self.clientChannel.embeddedEventLoop.run()
2206+
let stream = try streamPromise.futureResult.wait()
2207+
2208+
// Initiate request to open the stream on the server.
2209+
let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "http")])
2210+
let frame: HTTP2Frame.FramePayload = .headers(.init(headers: headers))
2211+
stream.writeAndFlush(frame, promise: nil)
2212+
self.interactInMemory(self.clientChannel, self.serverChannel)
2213+
2214+
// Fire the event on the server pipeline, this should propagate to the stream channel and
2215+
// the connection channel.
2216+
self.serverChannel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
2217+
XCTAssertNoThrow(try receivedShouldQuiesceEvent.futureResult.wait())
2218+
XCTAssertNoThrow(try connectionReceivedShouldQuiesceEvent.futureResult.wait())
2219+
2220+
XCTAssertNoThrow(try self.clientChannel.finish())
2221+
XCTAssertNoThrow(try self.serverChannel.finish())
2222+
}
2223+
}
2224+
2225+
2226+
final class ShouldQuiesceEventWaiter: ChannelInboundHandler {
2227+
typealias InboundIn = Never
2228+
2229+
private let promise: EventLoopPromise<Void>
2230+
2231+
init(promise: EventLoopPromise<Void>) {
2232+
self.promise = promise
2233+
}
2234+
2235+
func channelInactive(context: ChannelHandlerContext) {
2236+
self.promise.fail(ChannelError.eof)
2237+
context.fireChannelInactive()
2238+
}
2239+
2240+
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
2241+
if event is ChannelShouldQuiesceEvent {
2242+
self.promise.succeed(())
2243+
}
2244+
context.fireUserInboundEventTriggered(event)
2245+
}
21882246
}

Tests/NIOHTTP2Tests/SimpleClientServerInlineStreamMultiplexerTests.swift

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,4 +408,39 @@ class SimpleClientServerInlineStreamMultiplexerTests: XCTestCase {
408408
XCTAssertNoThrow(try self.clientChannel.finish())
409409
XCTAssertNoThrow(try self.serverChannel.finish())
410410
}
411+
412+
func testChannelShouldQuiesceIsPropagated() throws {
413+
// Setup the connection.
414+
let receivedShouldQuiesceEvent = self.clientChannel.eventLoop.makePromise(of: Void.self)
415+
try self.basicHTTP2Connection { stream in
416+
stream.pipeline.addHandler(ShouldQuiesceEventWaiter(promise: receivedShouldQuiesceEvent))
417+
}
418+
419+
let connectionReceivedShouldQuiesceEvent = self.clientChannel.eventLoop.makePromise(of: Void.self)
420+
try self.serverChannel.pipeline.addHandler(ShouldQuiesceEventWaiter(promise: connectionReceivedShouldQuiesceEvent)).wait()
421+
422+
// Create the stream channel.
423+
let multiplexer = try self.clientChannel.pipeline.handler(type: NIOHTTP2Handler.self).flatMap { $0.multiplexer }.wait()
424+
let streamPromise = self.clientChannel.eventLoop.makePromise(of: Channel.self)
425+
multiplexer.createStreamChannel(promise: streamPromise) {
426+
$0.eventLoop.makeSucceededVoidFuture()
427+
}
428+
self.clientChannel.embeddedEventLoop.run()
429+
let stream = try streamPromise.futureResult.wait()
430+
431+
// Initiate request to open the stream on the server.
432+
let headers = HPACKHeaders([(":path", "/"), (":method", "POST"), (":scheme", "http")])
433+
let frame: HTTP2Frame.FramePayload = .headers(.init(headers: headers))
434+
stream.writeAndFlush(frame, promise: nil)
435+
self.interactInMemory(self.clientChannel, self.serverChannel)
436+
437+
// Fire the event on the server pipeline, this should propagate to the stream channel and
438+
// the connection channel.
439+
self.serverChannel.pipeline.fireUserInboundEventTriggered(ChannelShouldQuiesceEvent())
440+
XCTAssertNoThrow(try receivedShouldQuiesceEvent.futureResult.wait())
441+
XCTAssertNoThrow(try connectionReceivedShouldQuiesceEvent.futureResult.wait())
442+
443+
XCTAssertNoThrow(try self.clientChannel.finish())
444+
XCTAssertNoThrow(try self.serverChannel.finish())
445+
}
411446
}

0 commit comments

Comments
 (0)