Skip to content

Commit 38b8235

Browse files
Lukasaweissi
authored andcommitted
Correctly activate child channels. (#27)
Motivation: Child channels should not activate before their parents. Modifications: Delay activation if the parent channel is not currently active. Activate during channelActive, if needed. Result: Easier to start HTTP/2 requests
1 parent c095189 commit 38b8235

File tree

4 files changed

+169
-5
lines changed

4 files changed

+169
-5
lines changed

Sources/NIOHTTP2/HTTP2StreamChannel.swift

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,11 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
106106
self.autoRead = autoRead
107107
return initializer?(self, self.streamID) ?? self.eventLoop.newSucceededFuture(result: ())
108108
}.map {
109-
self.state.activate()
110-
self.pipeline.fireChannelActive()
111-
if self.autoRead {
112-
self.read0()
109+
// This force unwrap is safe as parent is assigned in the initializer, and never unassigned.
110+
// If parent is not active, we expect to receive a channelActive later.
111+
if self.parent!.isActive {
112+
self.performActivation()
113113
}
114-
self.deliverPendingWrites()
115114
}
116115

117116
f.whenFailure { (error: Error) in
@@ -124,6 +123,17 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
124123
return f
125124
}
126125

126+
/// Activates this channel.
127+
internal func performActivation() {
128+
precondition(self.parent?.isActive ?? false, "Parent must be active to activate the child")
129+
self.state.activate()
130+
self.pipeline.fireChannelActive()
131+
if self.autoRead {
132+
self.read0()
133+
}
134+
self.deliverPendingWrites()
135+
}
136+
127137
private var _pipeline: ChannelPipeline!
128138

129139
public let allocator: ByteBufferAllocator

Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,17 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
7575
ctx.write(data, promise: promise)
7676
}
7777

78+
public func channelActive(ctx: ChannelHandlerContext) {
79+
// We just got channelActive. Any previously existing channels may be marked active.
80+
for channel in self.streams.values {
81+
// We double-check the channel activity here, because it's possible action taken during
82+
// the activation of one of the child channels will cause the parent to close!
83+
if ctx.channel.isActive {
84+
channel.performActivation()
85+
}
86+
}
87+
}
88+
7889
public func userInboundEventTriggered(ctx: ChannelHandlerContext, event: Any) {
7990
// The only event we care about right now is StreamClosedEvent, and in particular
8091
// we only care about it if we still have the stream channel for the stream

Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests+XCTest.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ extension HTTP2StreamMultiplexerTests {
5454
("testWritesOnCreatedChannelAreDelayed", testWritesOnCreatedChannelAreDelayed),
5555
("testWritesAreCancelledOnFailingInitializer", testWritesAreCancelledOnFailingInitializer),
5656
("testFailingInitializerDoesNotWrite", testFailingInitializerDoesNotWrite),
57+
("testCreatedChildChannelDoesNotActivateEarly", testCreatedChildChannelDoesNotActivateEarly),
58+
("testCreatedChildChannelActivatesIfParentIsActive", testCreatedChildChannelActivatesIfParentIsActive),
59+
("testInitiatedChildChannelActivates", testInitiatedChildChannelActivates),
5760
]
5861
}
5962
}

Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,28 @@ final class HandlerRemovedHandler: ChannelInboundHandler {
126126
}
127127

128128

129+
/// A channel handler that succeeds a promise when its channel becomes active.
130+
final class ActiveHandler: ChannelInboundHandler {
131+
typealias InboundIn = Any
132+
133+
let activatedPromise: EventLoopPromise<Void>
134+
135+
init(activatedPromise: EventLoopPromise<Void>) {
136+
self.activatedPromise = activatedPromise
137+
}
138+
139+
func handlerAdded(ctx: ChannelHandlerContext) {
140+
if ctx.channel.isActive {
141+
self.activatedPromise.succeed(result: ())
142+
}
143+
}
144+
145+
func channelActive(ctx: ChannelHandlerContext) {
146+
self.activatedPromise.succeed(result: ())
147+
}
148+
}
149+
150+
129151
final class HTTP2StreamMultiplexerTests: XCTestCase {
130152
var channel: EmbeddedChannel!
131153

@@ -199,6 +221,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
199221
func testChannelsCloseAfterResetStreamFrameFirstThenEvent() throws {
200222
var closeError: Error? = nil
201223

224+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
225+
202226
// First, set up the frames we want to send/receive.
203227
let streamID = HTTP2StreamID(knownID: Int32(1))
204228
var frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -233,6 +257,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
233257
func testChannelsCloseAfterGoawayFrameFirstThenEvent() throws {
234258
var closeError: Error? = nil
235259

260+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
261+
236262
// First, set up the frames we want to send/receive.
237263
let streamID = HTTP2StreamID(knownID: Int32(1))
238264
var frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -345,6 +371,9 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
345371
XCTAssertNoThrow(try self.channel.pipeline.add(handler: frameReceiver).wait())
346372
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
347373

374+
375+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
376+
348377
// Let's send a headers frame to open the stream.
349378
let streamID = HTTP2StreamID(knownID: 1)
350379
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -372,6 +401,9 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
372401
XCTAssertNoThrow(try self.channel.pipeline.add(handler: frameReceiver).wait())
373402
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
374403

404+
405+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
406+
375407
// Let's send a headers frame to open the stream.
376408
let streamID = HTTP2StreamID(knownID: 1)
377409
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -406,6 +438,9 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
406438
XCTAssertNoThrow(try self.channel.pipeline.add(handler: frameReceiver).wait())
407439
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
408440

441+
442+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
443+
409444
// Let's send a headers frame to open the stream.
410445
let streamID = HTTP2StreamID(knownID: 1)
411446
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -459,6 +494,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
459494
XCTAssertNoThrow(try self.channel.pipeline.add(handler: frameReceiver).wait())
460495
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
461496

497+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
498+
462499
// Let's send a headers frame to open the stream.
463500
let streamID = HTTP2StreamID(knownID: 1)
464501
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -494,6 +531,9 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
494531
}
495532
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
496533

534+
535+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
536+
497537
// Let's send a headers frame to open the stream.
498538
let streamID = HTTP2StreamID(knownID: 1)
499539
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -608,6 +648,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
608648
XCTAssertNoThrow(try self.channel.pipeline.add(handler: writeTracker).wait())
609649
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
610650

651+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
652+
611653
// Let's open two streams.
612654
let firstStreamID = HTTP2StreamID(knownID: 1)
613655
let secondStreamID = HTTP2StreamID(knownID: 3)
@@ -751,6 +793,9 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
751793
}
752794
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
753795

796+
797+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
798+
754799
// Let's open a stream.
755800
let streamID = HTTP2StreamID(knownID: 1)
756801
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -797,6 +842,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
797842
}
798843
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
799844

845+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
846+
800847
// Let's open two streams.
801848
for streamID in [firstStreamID, secondStreamID] {
802849
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -836,6 +883,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
836883
}
837884
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
838885

886+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
887+
839888
// Let's open a stream.
840889
let streamID = HTTP2StreamID(knownID: 1)
841890
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -884,6 +933,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
884933
XCTAssertNoThrow(try self.channel.pipeline.add(handler: readCounter).wait())
885934
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
886935

936+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
937+
887938
// Let's open a stream.
888939
let streamID = HTTP2StreamID(knownID: 1)
889940
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
@@ -1039,6 +1090,8 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
10391090
var childChannel: Channel? = nil
10401091
var childStreamID: HTTP2StreamID? = nil
10411092

1093+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
1094+
10421095
let multiplexer = HTTP2StreamMultiplexer { (_, _) in
10431096
XCTFail("Must not be called")
10441097
return self.channel.eventLoop.newFailedFuture(error: MyError())
@@ -1112,4 +1165,91 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
11121165

11131166
XCTAssertNoThrow(try self.channel.finish())
11141167
}
1168+
1169+
func testCreatedChildChannelDoesNotActivateEarly() throws {
1170+
var activated = false
1171+
1172+
let activePromise: EventLoopPromise<Void> = self.channel.eventLoop.newPromise()
1173+
let activeRecorder = ActiveHandler(activatedPromise: activePromise)
1174+
activePromise.futureResult.map {
1175+
activated = true
1176+
}.whenFailure { (_: Error) in
1177+
XCTFail("Activation promise must not fail")
1178+
}
1179+
1180+
let multiplexer = HTTP2StreamMultiplexer { (_, _) in
1181+
XCTFail("Must not be called")
1182+
return self.channel.eventLoop.newFailedFuture(error: MyError())
1183+
}
1184+
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
1185+
multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
1186+
return channel.pipeline.add(handler: activeRecorder)
1187+
}
1188+
(self.channel.eventLoop as! EmbeddedEventLoop).run()
1189+
XCTAssertFalse(activated)
1190+
1191+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
1192+
1193+
XCTAssertTrue(activated)
1194+
1195+
XCTAssertNoThrow(try self.channel.finish())
1196+
}
1197+
1198+
func testCreatedChildChannelActivatesIfParentIsActive() throws {
1199+
var activated = false
1200+
1201+
let activePromise: EventLoopPromise<Void> = self.channel.eventLoop.newPromise()
1202+
let activeRecorder = ActiveHandler(activatedPromise: activePromise)
1203+
activePromise.futureResult.map {
1204+
activated = true
1205+
}.whenFailure { (_: Error) in
1206+
XCTFail("Activation promise must not fail")
1207+
}
1208+
1209+
let multiplexer = HTTP2StreamMultiplexer { (_, _) in
1210+
XCTFail("Must not be called")
1211+
return self.channel.eventLoop.newFailedFuture(error: MyError())
1212+
}
1213+
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
1214+
1215+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 8765)).wait())
1216+
XCTAssertFalse(activated)
1217+
1218+
multiplexer.createStreamChannel(promise: nil) { (channel, streamID) in
1219+
return channel.pipeline.add(handler: activeRecorder)
1220+
}
1221+
(self.channel.eventLoop as! EmbeddedEventLoop).run()
1222+
XCTAssertTrue(activated)
1223+
1224+
XCTAssertNoThrow(try self.channel.finish())
1225+
}
1226+
1227+
func testInitiatedChildChannelActivates() throws {
1228+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(unixDomainSocketPath: "/whatever"), promise: nil))
1229+
1230+
var activated = false
1231+
1232+
let activePromise: EventLoopPromise<Void> = self.channel.eventLoop.newPromise()
1233+
let activeRecorder = ActiveHandler(activatedPromise: activePromise)
1234+
activePromise.futureResult.map {
1235+
activated = true
1236+
}.whenFailure { (_: Error) in
1237+
XCTFail("Activation promise must not fail")
1238+
}
1239+
1240+
let multiplexer = HTTP2StreamMultiplexer { (channel, _) in
1241+
return channel.pipeline.add(handler: activeRecorder)
1242+
}
1243+
XCTAssertNoThrow(try self.channel.pipeline.add(handler: multiplexer).wait())
1244+
self.channel.pipeline.fireChannelActive()
1245+
1246+
// Open a new stream.
1247+
XCTAssertFalse(activated)
1248+
let streamID = HTTP2StreamID(knownID: 1)
1249+
let frame = HTTP2Frame(streamID: streamID, payload: .headers(HTTPHeaders()))
1250+
XCTAssertNoThrow(try self.channel.writeInbound(frame))
1251+
XCTAssertTrue(activated)
1252+
1253+
XCTAssertNoThrow(try self.channel.finish())
1254+
}
11151255
}

0 commit comments

Comments
 (0)