Skip to content

Commit 2d5ab32

Browse files
authored
Remove indirection between channel and multiplexer (#126)
Motivation: Previously the HTTP2StreamChannel forwarded all frames to the parent Channel by way of the abstract Channel existential API. While this was straightforward, it added unnecessary overhead due to the extra pipeline steps and the extra indirection of the types providing an inlining barrier. Given that in #116 we gave the HTTP2StreamChannel a reference to its parent multiplexer, we may as well take advantage of this and allow direct calls from the stream channel into the multiplexer. Modifications: Avoided calls to parent.write and parent.flush. Replaced them with a new interface. Added new allocation test for this path. Result: Slightly better performance.
1 parent 8fb84b2 commit 2d5ab32

File tree

4 files changed

+153
-8
lines changed

4 files changed

+153
-8
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHPACK
17+
import NIOHTTP1
18+
import NIOHTTP2
19+
20+
/// Have two `EmbeddedChannel` objects send and receive data from each other until
21+
/// they make no forward progress.
22+
func interactInMemory(_ first: EmbeddedChannel, _ second: EmbeddedChannel) throws {
23+
var operated: Bool
24+
25+
func readBytesFromChannel(_ channel: EmbeddedChannel) throws -> ByteBuffer? {
26+
return try channel.readOutbound(as: ByteBuffer.self)
27+
}
28+
29+
repeat {
30+
operated = false
31+
first.embeddedEventLoop.run()
32+
33+
if let data = try readBytesFromChannel(first) {
34+
operated = true
35+
try second.writeInbound(data)
36+
}
37+
if let data = try readBytesFromChannel(second) {
38+
operated = true
39+
try first.writeInbound(data)
40+
}
41+
} while operated
42+
}
43+
44+
final class ServerHandler: ChannelInboundHandler {
45+
typealias InboundIn = HTTP2Frame
46+
typealias OutboundOut = HTTP2Frame
47+
48+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
49+
let data = self.unwrapInboundIn(data)
50+
switch data.payload {
51+
case .headers(let headers) where headers.endStream:
52+
break
53+
case .data(let data) where data.endStream:
54+
break
55+
default:
56+
// Ignore this frame
57+
return
58+
}
59+
60+
// We got END_STREAM. Let's send a response.
61+
let headers = HPACKHeaders([(":status", "200")])
62+
let responseFrame = HTTP2Frame(streamID: data.streamID, payload: .headers(.init(headers: headers, endStream: true)))
63+
context.writeAndFlush(self.wrapOutboundOut(responseFrame), promise: nil)
64+
}
65+
}
66+
67+
68+
final class ClientHandler: ChannelInboundHandler {
69+
typealias InboundIn = HTTP2Frame
70+
typealias OutboundOut = HTTP2Frame
71+
72+
let streamID: HTTP2StreamID
73+
74+
init(streamID: HTTP2StreamID) {
75+
self.streamID = streamID
76+
}
77+
78+
func channelActive(context: ChannelHandlerContext) {
79+
// Send a request.
80+
let headers = HPACKHeaders([(":path", "/"),
81+
(":authority", "localhost"),
82+
(":method", "GET"),
83+
(":scheme", "https")])
84+
let requestFrame = HTTP2Frame(streamID: self.streamID, payload: .headers(.init(headers: headers, endStream: true)))
85+
context.writeAndFlush(self.wrapOutboundOut(requestFrame), promise: nil)
86+
}
87+
}
88+
89+
func run(identifier: String) {
90+
let loop = EmbeddedEventLoop()
91+
92+
measure(identifier: identifier) {
93+
var sumOfStreamIDs = 0
94+
95+
for _ in 0..<1000 {
96+
let clientChannel = EmbeddedChannel(loop: loop)
97+
let serverChannel = EmbeddedChannel(loop: loop)
98+
99+
let clientMultiplexer = try! clientChannel.configureHTTP2Pipeline(mode: .client).wait()
100+
_ = try! serverChannel.configureHTTP2Pipeline(mode: .server) { (channel, streamID) in
101+
return channel.pipeline.addHandler(ServerHandler())
102+
}.wait()
103+
try! clientChannel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()
104+
try! serverChannel.connect(to: SocketAddress(ipAddress: "1.2.3.4", port: 5678)).wait()
105+
106+
let promise = clientChannel.eventLoop.makePromise(of: Channel.self)
107+
clientMultiplexer.createStreamChannel(promise: promise) { (channel, streamID) in
108+
return channel.pipeline.addHandler(ClientHandler(streamID: streamID))
109+
}
110+
clientChannel.embeddedEventLoop.run()
111+
let child = try! promise.futureResult.wait()
112+
let streamID = try! Int(child.getOption(HTTP2StreamChannelOptions.streamID).wait())
113+
114+
sumOfStreamIDs += streamID
115+
try! interactInMemory(clientChannel, serverChannel)
116+
try! child.closeFuture.wait()
117+
118+
try! clientChannel.close().wait()
119+
try! serverChannel.close().wait()
120+
}
121+
122+
return sumOfStreamIDs
123+
}
124+
}
125+

Sources/NIOHTTP2/HTTP2StreamChannel.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,13 +549,13 @@ internal extension HTTP2StreamChannel {
549549
/// - frame: The `HTTP2Frame` to send to the network.
550550
/// - promise: The promise associated with the frame write.
551551
private func receiveOutboundFrame(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
552-
guard let parent = self.parent, self.state != .closed else {
552+
guard self.state != .closed else {
553553
let error = ChannelError.alreadyClosed
554554
promise?.fail(error)
555555
self.errorEncountered(error: error)
556556
return
557557
}
558-
parent.write(frame, promise: promise)
558+
self.multiplexer.childChannelWrite(frame, promise: promise)
559559
}
560560

561561
/// Called when a stream closure is received from the network.
@@ -576,7 +576,7 @@ internal extension HTTP2StreamChannel {
576576
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
577577
self.receiveOutboundFrame(frame, promise: nil)
578578
// This flush should really go away, but we need it for now until we sort out window management.
579-
self.parent?.flush()
579+
self.multiplexer.childChannelFlush()
580580
}
581581
}
582582

@@ -585,7 +585,7 @@ internal extension HTTP2StreamChannel {
585585
let frame = HTTP2Frame(streamID: self.streamID, payload: .windowUpdate(windowSizeIncrement: increment))
586586
self.receiveOutboundFrame(frame, promise: nil)
587587
// This flush should really go away, but we need it for now until we sort out window management.
588-
self.parent?.flush()
588+
self.multiplexer.childChannelFlush()
589589
}
590590
}
591591
}

Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
3030
private var streams: [HTTP2StreamID: HTTP2StreamChannel] = [:]
3131
private let inboundStreamStateInitializer: ((Channel, HTTP2StreamID) -> EventLoopFuture<Void>)?
3232
private let channel: Channel
33+
private var context: ChannelHandlerContext!
3334
private var nextOutboundStreamID: HTTP2StreamID
3435
private var connectionFlowControlManager: InboundWindowManager
3536

3637
public func handlerAdded(context: ChannelHandlerContext) {
3738
// We now need to check that we're on the same event loop as the one we were originally given.
3839
// If we weren't, this is a hard failure, as there is a thread-safety issue here.
3940
self.channel.eventLoop.preconditionInEventLoop()
41+
self.context = context
42+
}
43+
44+
public func handlerRemoved(context: ChannelHandlerContext) {
45+
self.context = nil
4046
}
4147

4248
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@@ -122,10 +128,6 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
122128
context.fireUserInboundEventTriggered(event)
123129
}
124130

125-
internal func childChannelClosed(streamID: HTTP2StreamID) {
126-
self.streams.removeValue(forKey: streamID)
127-
}
128-
129131
private func newConnectionWindowSize(newSize: Int, context: ChannelHandlerContext) {
130132
guard let increment = self.connectionFlowControlManager.newWindowSize(newSize) else {
131133
return
@@ -189,3 +191,19 @@ extension HTTP2StreamMultiplexer {
189191
}
190192
}
191193
}
194+
195+
196+
// MARK:- Child to parent calls
197+
extension HTTP2StreamMultiplexer {
198+
internal func childChannelClosed(streamID: HTTP2StreamID) {
199+
self.streams.removeValue(forKey: streamID)
200+
}
201+
202+
internal func childChannelWrite(_ frame: HTTP2Frame, promise: EventLoopPromise<Void>?) {
203+
self.context.write(self.wrapOutboundOut(frame), promise: promise)
204+
}
205+
206+
internal func childChannelFlush() {
207+
self.context.flush()
208+
}
209+
}

docker/docker-compose.1804.50.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ services:
1515
environment:
1616
- MAX_ALLOCS_ALLOWED_create_client_stream_channel=70010
1717
- MAX_ALLOCS_ALLOWED_hpack_decoding=5050
18+
- MAX_ALLOCS_ALLOWED_client_server_request_response=360000
1819

1920
test:
2021
image: swift-nio-http2:18.04-5.0
2122
command: /bin/bash -cl "swift test -Xswiftc -warnings-as-errors && ./scripts/integration_tests.sh"
2223
environment:
2324
- MAX_ALLOCS_ALLOWED_create_client_stream_channel=70010
2425
- MAX_ALLOCS_ALLOWED_hpack_decoding=5050
26+
- MAX_ALLOCS_ALLOWED_client_server_request_response=360000
2527

2628
h2spec:
2729
image: swift-nio-http2:18.04-5.0

0 commit comments

Comments
 (0)