Skip to content

Commit 06880b0

Browse files
authored
Minimise channelReadComplete traffic. (#136)
Motivation: Currently swift-nio-http2 is extremely naive with its use of channelReadComplete calls in HTTP2StreamChannel, firing one channelReadComplete call per frame read. In high frame load cases this leads to a lot of excessive channelReadComplete traffic, which can cause unnecessary time spent in flush calls. We should try to minimise the amount of time we spend on this bookkeeping and save the CPU cost. Modifications: - Store a linked-list of HTTP2StreamChannels that need to have channelReadComplete fired on them. - Fire channelReadComplete based on this linked list. Result: Moderate improvement in performance in high-throughput workloads.
1 parent 419cd4e commit 06880b0

File tree

5 files changed

+257
-5
lines changed

5 files changed

+257
-5
lines changed

Sources/NIOHTTP2/HTTP2StreamChannel.swift

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,9 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
316316
/// stream is flushed, at which time we deliver them all. This buffer holds the pending ones.
317317
private var pendingWrites: MarkedCircularBuffer<(HTTP2Frame, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 8)
318318

319+
/// A list node used to hold stream channels.
320+
internal var streamChannelListNode: StreamChannelListNode = StreamChannelListNode()
321+
319322
public func register0(promise: EventLoopPromise<Void>?) {
320323
fatalError("not implemented \(#function)")
321324
}
@@ -466,8 +469,6 @@ final class HTTP2StreamChannel: Channel, ChannelCore {
466469
return
467470
}
468471

469-
assert(self.pendingReads.count > 0, "tryToRead called without reads!")
470-
471472
// If we're not active, we will hold on to these reads.
472473
guard self.isActive else {
473474
return
@@ -538,8 +539,11 @@ internal extension HTTP2StreamChannel {
538539
return
539540
}
540541

541-
self.pendingReads.append(frame)
542-
self.tryToRead()
542+
if self.unsatisfiedRead {
543+
self.pipeline.fireChannelRead(NIOAny(frame))
544+
} else {
545+
self.pendingReads.append(frame)
546+
}
543547
}
544548

545549

@@ -563,6 +567,9 @@ internal extension HTTP2StreamChannel {
563567
/// - parameters:
564568
/// - reason: The reason received from the network, if any.
565569
func receiveStreamClosed(_ reason: HTTP2ErrorCode?) {
570+
// The stream is closed, we should aim to deliver any read frames we have for it.
571+
self.tryToRead()
572+
566573
if let reason = reason {
567574
let err = NIOHTTP2Errors.StreamClosed(streamID: self.streamID, errorCode: reason)
568575
self.errorEncountered(error: err)
@@ -588,5 +595,8 @@ internal extension HTTP2StreamChannel {
588595
self.multiplexer.childChannelFlush()
589596
}
590597
}
591-
}
592598

599+
func receiveParentChannelReadComplete() {
600+
self.tryToRead()
601+
}
602+
}

Sources/NIOHTTP2/HTTP2StreamMultiplexer.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
3434
private var nextOutboundStreamID: HTTP2StreamID
3535
private var connectionFlowControlManager: InboundWindowManager
3636
private var flushState: FlushState = .notReading
37+
private var didReadChannels: StreamChannelList = StreamChannelList()
3738

3839
public func handlerAdded(context: ChannelHandlerContext) {
3940
// We now need to check that we're on the same event loop as the one we were originally given.
@@ -44,6 +45,7 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
4445

4546
public func handlerRemoved(context: ChannelHandlerContext) {
4647
self.context = nil
48+
self.didReadChannels.removeAll()
4749
}
4850

4951
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
@@ -66,6 +68,9 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
6668

6769
if let channel = streams[streamID] {
6870
channel.receiveInboundFrame(frame)
71+
if !channel.inList {
72+
self.didReadChannels.append(channel)
73+
}
6974
} else if case .headers = frame.payload {
7075
let channel = HTTP2StreamChannel(allocator: self.channel.allocator,
7176
parent: self.channel,
@@ -75,6 +80,10 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
7580
self.streams[streamID] = channel
7681
channel.configure(initializer: self.inboundStreamStateInitializer, userPromise: nil)
7782
channel.receiveInboundFrame(frame)
83+
84+
if !channel.inList {
85+
self.didReadChannels.append(channel)
86+
}
7887
} else {
7988
// This frame is for a stream we know nothing about. We can't do much about it, so we
8089
// are going to fire an error and drop the frame.
@@ -84,6 +93,11 @@ public final class HTTP2StreamMultiplexer: ChannelInboundHandler, ChannelOutboun
8493
}
8594

8695
public func channelReadComplete(context: ChannelHandlerContext) {
96+
// Call channelReadComplete on the children until this has been propagated enough.
97+
while let channel = self.didReadChannels.removeFirst() {
98+
channel.receiveParentChannelReadComplete()
99+
}
100+
87101
if case .flushPending = self.flushState {
88102
self.flushState = .notReading
89103
context.flush()
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2019 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
16+
/// A linked list for storing HTTP2StreamChannels.
17+
///
18+
/// Note that while this object *could* conform to `Sequence`, there is minimal value in doing
19+
/// that here, as it's so single-use. If we find ourselves needing to expand on this data type
20+
/// in future we can revisit that idea.
21+
struct StreamChannelList {
22+
private var head: HTTP2StreamChannel?
23+
private var tail: HTTP2StreamChannel?
24+
}
25+
26+
/// A node for objects stored in an intrusive linked list.
27+
///
28+
/// Any object that wishes to be stored in a linked list must embed one of these nodes.
29+
struct StreamChannelListNode {
30+
fileprivate enum ListState {
31+
case inList(next: HTTP2StreamChannel?)
32+
case notInList
33+
}
34+
35+
fileprivate var state: ListState = .notInList
36+
37+
internal init() { }
38+
}
39+
40+
41+
extension StreamChannelList {
42+
/// Append an element to the linked list.
43+
mutating func append(_ element: HTTP2StreamChannel) {
44+
precondition(!element.inList)
45+
46+
guard case .notInList = element.streamChannelListNode.state else {
47+
preconditionFailure("Appended an element already in a list")
48+
}
49+
50+
element.streamChannelListNode.state = .inList(next: nil)
51+
52+
if let tail = self.tail {
53+
tail.streamChannelListNode.state = .inList(next: element)
54+
self.tail = element
55+
} else {
56+
assert(self.head == nil)
57+
self.head = element
58+
self.tail = element
59+
}
60+
}
61+
62+
mutating func removeFirst() -> HTTP2StreamChannel? {
63+
guard let head = self.head else {
64+
assert(self.tail == nil)
65+
return nil
66+
}
67+
68+
guard case .inList(let next) = head.streamChannelListNode.state else {
69+
preconditionFailure("Popped an element not in a list")
70+
}
71+
72+
self.head = next
73+
if self.head == nil {
74+
assert(self.tail === head)
75+
self.tail = nil
76+
}
77+
78+
head.streamChannelListNode = .init()
79+
return head
80+
}
81+
82+
mutating func removeAll() {
83+
while self.removeFirst() != nil { }
84+
}
85+
}
86+
87+
88+
// MARK:- IntrusiveLinkedListElement helpers.
89+
extension HTTP2StreamChannel {
90+
/// Whether this element is currently in a list.
91+
internal var inList: Bool {
92+
switch self.streamChannelListNode.state {
93+
case .inList:
94+
return true
95+
case .notInList:
96+
return false
97+
}
98+
}
99+
}

Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests+XCTest.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ extension HTTP2StreamMultiplexerTests {
6565
("testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive", testCreatedChildChannelCanBeClosedImmediatelyWhenBaseIsActive),
6666
("testCreatedChildChannelCanBeClosedBeforeWritingHeadersWhenBaseIsActive", testCreatedChildChannelCanBeClosedBeforeWritingHeadersWhenBaseIsActive),
6767
("testMultiplexerCoalescesFlushCallsDuringChannelRead", testMultiplexerCoalescesFlushCallsDuringChannelRead),
68+
("testMultiplexerDoesntFireReadCompleteForEachFrame", testMultiplexerDoesntFireReadCompleteForEachFrame),
69+
("testMultiplexerCorrectlyTellsAllStreamsAboutReadComplete", testMultiplexerCorrectlyTellsAllStreamsAboutReadComplete),
6870
]
6971
}
7072
}

Tests/NIOHTTP2Tests/HTTP2StreamMultiplexerTests.swift

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,19 @@ final class FlushCounter: ChannelOutboundHandler {
124124
}
125125

126126

127+
final class ReadCompleteCounter: ChannelInboundHandler {
128+
typealias InboundIn = Any
129+
typealias InboundOut = Any
130+
131+
var readCompleteCount = 0
132+
133+
func channelReadComplete(context: ChannelHandlerContext) {
134+
self.readCompleteCount += 1
135+
context.fireChannelReadComplete()
136+
}
137+
}
138+
139+
127140
/// A channel handler that sends a response in response to a HEADERS frame.
128141
final class QuickResponseHandler: ChannelInboundHandler {
129142
typealias InboundIn = HTTP2Frame
@@ -1497,4 +1510,118 @@ final class HTTP2StreamMultiplexerTests: XCTestCase {
14971510
XCTAssertEqual(try self.channel.sentFrames().count, 10)
14981511
XCTAssertEqual(flushCounter.flushCount, 1)
14991512
}
1513+
1514+
func testMultiplexerDoesntFireReadCompleteForEachFrame() {
1515+
// We need to activate the underlying channel here.
1516+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())
1517+
1518+
let frameRecorder = InboundFrameRecorder()
1519+
let readCompleteCounter = ReadCompleteCounter()
1520+
1521+
let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { (childChannel, _) in
1522+
return childChannel.pipeline.addHandler(frameRecorder).flatMap {
1523+
childChannel.pipeline.addHandler(readCompleteCounter)
1524+
}
1525+
}
1526+
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())
1527+
1528+
XCTAssertEqual(frameRecorder.receivedFrames.count, 0)
1529+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)
1530+
1531+
// Wake up and activate the stream.
1532+
let requestHeaders = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])
1533+
let requestFrame = HTTP2Frame(streamID: 1, payload: .headers(.init(headers: requestHeaders, endStream: false)))
1534+
self.channel.pipeline.fireChannelRead(NIOAny(requestFrame))
1535+
self.activateStream(1)
1536+
self.channel.embeddedEventLoop.run()
1537+
1538+
XCTAssertEqual(frameRecorder.receivedFrames.count, 1)
1539+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)
1540+
1541+
// Now we're going to send 9 data frames.
1542+
var requestData = self.channel.allocator.buffer(capacity: 1024)
1543+
requestData.writeBytes("Hello world!".utf8)
1544+
let dataFrames = repeatElement(HTTP2Frame(streamID: 1, payload: .data(.init(data: .byteBuffer(requestData), endStream: false))), count: 9)
1545+
1546+
for frame in dataFrames {
1547+
self.channel.pipeline.fireChannelRead(NIOAny(frame))
1548+
}
1549+
1550+
// We should have 10 reads, and zero read completes.
1551+
XCTAssertEqual(frameRecorder.receivedFrames.count, 10)
1552+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)
1553+
1554+
// Fire read complete on the parent and it'll propagate to the child.
1555+
self.channel.pipeline.fireChannelReadComplete()
1556+
1557+
// We should have 10 reads, and one read complete.
1558+
XCTAssertEqual(frameRecorder.receivedFrames.count, 10)
1559+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 1)
1560+
1561+
// If we fire a new read complete on the parent, the child doesn't see it this time, as it received no frames.
1562+
self.channel.pipeline.fireChannelReadComplete()
1563+
XCTAssertEqual(frameRecorder.receivedFrames.count, 10)
1564+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 1)
1565+
}
1566+
1567+
func testMultiplexerCorrectlyTellsAllStreamsAboutReadComplete() {
1568+
// We need to activate the underlying channel here.
1569+
XCTAssertNoThrow(try self.channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 80)).wait())
1570+
1571+
// These are deliberately getting inserted to all streams. The test above confirms the single-stream
1572+
// behaviour is correct.
1573+
let frameRecorder = InboundFrameRecorder()
1574+
let readCompleteCounter = ReadCompleteCounter()
1575+
1576+
let multiplexer = HTTP2StreamMultiplexer(mode: .server, channel: self.channel) { (childChannel, _) in
1577+
return childChannel.pipeline.addHandler(frameRecorder).flatMap {
1578+
childChannel.pipeline.addHandler(readCompleteCounter)
1579+
}
1580+
}
1581+
XCTAssertNoThrow(try self.channel.pipeline.addHandler(multiplexer).wait())
1582+
1583+
XCTAssertEqual(frameRecorder.receivedFrames.count, 0)
1584+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)
1585+
1586+
// Wake up and activate the streams.
1587+
let requestHeaders = HPACKHeaders([(":path", "/"), (":method", "GET"), (":authority", "localhost"), (":scheme", "https")])
1588+
1589+
for streamID in [HTTP2StreamID(1), HTTP2StreamID(3), HTTP2StreamID(5)] {
1590+
let requestFrame = HTTP2Frame(streamID: streamID, payload: .headers(.init(headers: requestHeaders, endStream: false)))
1591+
self.channel.pipeline.fireChannelRead(NIOAny(requestFrame))
1592+
self.activateStream(streamID)
1593+
}
1594+
self.channel.embeddedEventLoop.run()
1595+
1596+
XCTAssertEqual(frameRecorder.receivedFrames.count, 3)
1597+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 0)
1598+
1599+
// Firing in readComplete causes a readComplete for each stream.
1600+
self.channel.pipeline.fireChannelReadComplete()
1601+
XCTAssertEqual(frameRecorder.receivedFrames.count, 3)
1602+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 3)
1603+
1604+
// Now we're going to send a data frame on stream 1.
1605+
var requestData = self.channel.allocator.buffer(capacity: 1024)
1606+
requestData.writeBytes("Hello world!".utf8)
1607+
let frame = HTTP2Frame(streamID: 1, payload: .data(.init(data: .byteBuffer(requestData), endStream: false)))
1608+
self.channel.pipeline.fireChannelRead(NIOAny(frame))
1609+
1610+
// We should have 4 reads, and 3 read completes.
1611+
XCTAssertEqual(frameRecorder.receivedFrames.count, 4)
1612+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 3)
1613+
1614+
// Fire read complete on the parent and it'll propagate to the child, but only to the one
1615+
// that saw a frame.
1616+
self.channel.pipeline.fireChannelReadComplete()
1617+
1618+
// We should have 4 reads, and 4 read completes.
1619+
XCTAssertEqual(frameRecorder.receivedFrames.count, 4)
1620+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 4)
1621+
1622+
// If we fire a new read complete on the parent, the children don't see it.
1623+
self.channel.pipeline.fireChannelReadComplete()
1624+
XCTAssertEqual(frameRecorder.receivedFrames.count, 4)
1625+
XCTAssertEqual(readCompleteCounter.readCompleteCount, 4)
1626+
}
15001627
}

0 commit comments

Comments
 (0)