Skip to content

Commit 462ded7

Browse files
authored
HTTP/2 Async API (#424)
We previously developed an async API for NIO HTTP/2 which was guarded under SPI. Now that the swift-nio async API is released we can reintroduce this code promoted to SPI. This change introduces: * `AsyncStreamMultiplexer` - an async variant of the HTTP/2 stream multiplexer which can be used to create outbound streams and provide access to an async sequence (`NIOHTTP2AsyncSequence`) of inbound streams * New pipeline configuration functions (e.g. `configureAsyncHTTP2Pipeline`) to support the new async mode
1 parent d6f9a3b commit 462ded7

6 files changed

+817
-3
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ let package = Package(
2121
.library(name: "NIOHTTP2", targets: ["NIOHTTP2"]),
2222
],
2323
dependencies: [
24-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.58.0"),
24+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.60.0"),
2525
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
2626
.package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"),
2727
],

Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,42 @@ extension InlineStreamMultiplexer {
211211
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
212212
}
213213
}
214+
215+
extension NIOHTTP2Handler {
216+
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
217+
/// provides access to inbound HTTP/2 streams.
218+
///
219+
/// In general in NIO applications it is helpful to consider each HTTP/2 stream as an
220+
/// independent stream of HTTP/2 frames. This multiplexer achieves this by creating a
221+
/// number of in-memory `HTTP2StreamChannel` objects, one for each stream. These operate
222+
/// on ``HTTP2Frame/FramePayload`` objects as their base communication
223+
/// atom, as opposed to the regular NIO `SelectableChannel` objects which use `ByteBuffer`
224+
/// and `IOData`.
225+
///
226+
/// Inbound (remotely-initiated) streams are accessible via the ``inbound`` property, having been initialized and
227+
/// returned as the `InboundStreamOutput` type.
228+
///
229+
/// You can open a stream by calling ``openStream(_:)``. Locally-initiated stream channel objects are initialized upon creation using the supplied `initializer` which returns a type
230+
/// `Output`. This type may be `HTTP2Frame` or changed to any other type.
231+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
232+
public struct AsyncStreamMultiplexer<InboundStreamOutput> {
233+
private let inlineStreamMultiplexer: InlineStreamMultiplexer
234+
public let inbound: NIOHTTP2AsyncSequence<InboundStreamOutput>
235+
236+
// Cannot be created by users.
237+
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence<InboundStreamOutput>) {
238+
self.inlineStreamMultiplexer = inlineStreamMultiplexer
239+
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
240+
self.inbound = inboundStreamChannels
241+
}
242+
243+
244+
/// Create a stream channel initialized with the provided closure
245+
/// - Parameter initializer: A closure that will be called upon the created stream which is responsible for
246+
/// initializing the stream's `Channel`.
247+
/// - Returns: The result of the `initializer`.
248+
public func openStream<Output: Sendable>(_ initializer: @escaping NIOChannelInitializerWithOutput<Output>) async throws -> Output {
249+
return try await self.inlineStreamMultiplexer.createStreamChannel(initializer).get()
250+
}
251+
}
252+
}

Sources/NIOHTTP2/HTTP2ChannelHandler.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,4 +1180,16 @@ extension NIOHTTP2Handler {
11801180
throw NIOHTTP2Errors.missingMultiplexer()
11811181
}
11821182
}
1183+
1184+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
1185+
internal func syncAsyncStreamMultiplexer<Output: Sendable>(continuation: any AnyContinuation, inboundStreamChannels: NIOHTTP2AsyncSequence<Output>) throws -> AsyncStreamMultiplexer<Output> {
1186+
self.eventLoop!.preconditionInEventLoop()
1187+
1188+
switch self.inboundStreamMultiplexer {
1189+
case let .some(.inline(multiplexer)):
1190+
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
1191+
case .some(.legacy), .none:
1192+
throw NIOHTTP2Errors.missingMultiplexer()
1193+
}
1194+
}
11831195
}

Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,120 @@ extension HTTP2CommonInboundStreamMultiplexer {
444444
}
445445
}
446446

447-
/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
448-
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
447+
/// `AnyContinuation` is used to generic async-sequence-like objects to deal with the generic element types without
448+
/// the holding type becoming generic itself.
449+
///
450+
/// This is useful in in the case of the `HTTP2ChannelHandler` which must deal with types which hold stream initializers
451+
/// which have a generic return type.
449452
internal protocol AnyContinuation {
450453
func yield(any: Any)
451454
func finish()
452455
func finish(throwing error: Error)
453456
}
457+
458+
459+
/// `NIOHTTP2AsyncSequence` is an implementation of the `AsyncSequence` protocol which allows iteration over a generic
460+
/// element type `Output`.
461+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
462+
public struct NIOHTTP2AsyncSequence<Output>: AsyncSequence {
463+
public struct AsyncIterator: AsyncIteratorProtocol {
464+
public typealias Element = Output
465+
466+
private var iterator: AsyncThrowingStream<Output, Error>.AsyncIterator
467+
468+
init(wrapping iterator: AsyncThrowingStream<Output, Error>.AsyncIterator) {
469+
self.iterator = iterator
470+
}
471+
472+
public mutating func next() async throws -> Output? {
473+
try await self.iterator.next()
474+
}
475+
}
476+
477+
public typealias Element = Output
478+
479+
private let asyncThrowingStream: AsyncThrowingStream<Output, Error>
480+
481+
private init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
482+
self.asyncThrowingStream = asyncThrowingStream
483+
}
484+
485+
public func makeAsyncIterator() -> AsyncIterator {
486+
AsyncIterator(wrapping: self.asyncThrowingStream.makeAsyncIterator())
487+
}
488+
}
489+
490+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
491+
extension NIOHTTP2AsyncSequence {
492+
/// `Continuation` is a wrapper for a generic `AsyncThrowingStream` to which the products of the initializers of
493+
/// inbound (remotely-initiated) HTTP/2 stream channels are yielded.
494+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
495+
struct Continuation: AnyContinuation {
496+
private var continuation: AsyncThrowingStream<Output, Error>.Continuation
497+
498+
internal init(wrapping continuation: AsyncThrowingStream<Output, Error>.Continuation) {
499+
self.continuation = continuation
500+
}
501+
502+
/// `yield` takes a channel as outputted by the stream initializer and yields the wrapped `AsyncThrowingStream`.
503+
///
504+
/// It takes channels as as `Any` type to allow wrapping by the stream initializer.
505+
func yield(any: Any) {
506+
let yieldResult = self.continuation.yield(any as! Output)
507+
switch yieldResult {
508+
case .enqueued:
509+
break // success, nothing to do
510+
case .dropped:
511+
preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
512+
case .terminated:
513+
preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.")
514+
default:
515+
preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.")
516+
}
517+
}
518+
519+
/// `finish` marks the continuation as finished.
520+
func finish() {
521+
self.continuation.finish()
522+
}
523+
524+
/// `finish` marks the continuation as finished with the supplied error.
525+
func finish(throwing error: Error) {
526+
self.continuation.finish(throwing: error)
527+
}
528+
}
529+
530+
531+
/// `initialize` creates a new `Continuation` object and returns it along with its backing ``NIOHTTP2AsyncSequence``.
532+
/// The `Continuation` provides the ability to yield to the backing .``NIOHTTP2AsyncSequence``.
533+
///
534+
/// - Parameters:
535+
/// - inboundStreamInitializerOutput: The type which is returned by the initializer operating on the inbound
536+
/// (remotely-initiated) HTTP/2 streams.
537+
static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2AsyncSequence<Output>, Continuation) {
538+
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
539+
return (.init(stream), Continuation(wrapping: continuation))
540+
}
541+
}
542+
543+
@available(*, unavailable)
544+
extension NIOHTTP2AsyncSequence.AsyncIterator: Sendable {}
545+
546+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
547+
extension NIOHTTP2AsyncSequence: Sendable where Output: Sendable {}
548+
549+
#if swift(<5.9)
550+
// this should be available in the std lib from 5.9 onwards
551+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
552+
extension AsyncThrowingStream {
553+
static func makeStream(
554+
of elementType: Element.Type = Element.self,
555+
throwing failureType: Failure.Type = Failure.self,
556+
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
557+
) -> (stream: AsyncThrowingStream<Element, Failure>, continuation: AsyncThrowingStream<Element, Failure>.Continuation) where Failure == Error {
558+
var continuation: AsyncThrowingStream<Element, Failure>.Continuation!
559+
let stream = AsyncThrowingStream<Element, Failure>(bufferingPolicy: limit) { continuation = $0 }
560+
return (stream: stream, continuation: continuation!)
561+
}
562+
}
563+
#endif

0 commit comments

Comments
 (0)