From b95182e7a511b72847134e7cc2fb1e0d683879d0 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 12 Aug 2025 15:15:20 +0100 Subject: [PATCH 1/7] Add dedicated heartbeat function to subscription writers --- v2/pkg/engine/resolve/resolve.go | 17 ++++------------- v2/pkg/engine/resolve/response.go | 1 + 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 0d0bed5f5..8e94813ff 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -21,10 +21,6 @@ const ( DefaultHeartbeatInterval = 5 * time.Second ) -var ( - multipartHeartbeat = []byte("{}") -) - // ConnectionIDs is used to create unique connection IDs for each subscription // Whenever a new connection is created, use this to generate a new ID // It is public because it can be used in more high level packages to instantiate a new connection @@ -330,7 +326,7 @@ func (s *sub) startWorkerWithHeartbeat() { return case <-heartbeatTicker.C: - s.resolver.handleHeartbeat(s, multipartHeartbeat) + s.resolver.handleHeartbeat(s) case work := <-s.workChan: work.fn() @@ -501,7 +497,7 @@ func (r *Resolver) handleEvent(event subscriptionEvent) { } // handleHeartbeat sends a heartbeat to the client. It needs to be executed on the same goroutine as the writer. -func (r *Resolver) handleHeartbeat(sub *sub, data []byte) { +func (r *Resolver) handleHeartbeat(sub *sub) { if r.options.Debug { fmt.Printf("resolver:heartbeat\n") } @@ -518,7 +514,7 @@ func (r *Resolver) handleHeartbeat(sub *sub, data []byte) { fmt.Printf("resolver:heartbeat:subscription:%d\n", sub.id.SubscriptionID) } - if _, err := sub.writer.Write(data); err != nil { + if err := sub.writer.Heartbeat(); err != nil { if errors.Is(err, context.Canceled) { // If Write fails (e.g. client disconnected), remove the subscription. _ = r.AsyncUnsubscribeSubscription(sub.id) @@ -526,16 +522,11 @@ func (r *Resolver) handleHeartbeat(sub *sub, data []byte) { } r.asyncErrorWriter.WriteError(sub.ctx, err, nil, sub.writer) } - err := sub.writer.Flush() - if err != nil { - // If flush fails (e.g. client disconnected), remove the subscription. - _ = r.AsyncUnsubscribeSubscription(sub.id) - return - } if r.options.Debug { fmt.Printf("resolver:heartbeat:subscription:flushed:%d\n", sub.id.SubscriptionID) } + if r.reporter != nil { r.reporter.SubscriptionUpdateSent() } diff --git a/v2/pkg/engine/resolve/response.go b/v2/pkg/engine/resolve/response.go index 926508f5c..d340908a8 100644 --- a/v2/pkg/engine/resolve/response.go +++ b/v2/pkg/engine/resolve/response.go @@ -67,6 +67,7 @@ type SubscriptionResponseWriter interface { ResponseWriter Flush() error Complete() + Heartbeat() error Close(kind SubscriptionCloseKind) } From 90aebcf27512a91c709475ef0bf25a7c6459223d Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 12 Aug 2025 15:22:42 +0100 Subject: [PATCH 2/7] Add Heartbeat method to EngineResultWriter --- execution/graphql/result_writer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/execution/graphql/result_writer.go b/execution/graphql/result_writer.go index dc1ce288d..f97cbf721 100644 --- a/execution/graphql/result_writer.go +++ b/execution/graphql/result_writer.go @@ -35,6 +35,10 @@ func (e *EngineResultWriter) Complete() { } +func (e *EngineResultWriter) Heartbeat() error { + return nil +} + func (e *EngineResultWriter) Close(_ resolve.SubscriptionCloseKind) { } From d295b5f3cb609481fa16cf79eb540414651c6991 Mon Sep 17 00:00:00 2001 From: endigma Date: Tue, 12 Aug 2025 15:37:38 +0100 Subject: [PATCH 3/7] Add stub heartbeat method to test subscription writers --- v2/pkg/engine/resolve/event_loop_test.go | 4 ++++ v2/pkg/engine/resolve/resolve_test.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/v2/pkg/engine/resolve/event_loop_test.go b/v2/pkg/engine/resolve/event_loop_test.go index 7acb69ca8..206442a18 100644 --- a/v2/pkg/engine/resolve/event_loop_test.go +++ b/v2/pkg/engine/resolve/event_loop_test.go @@ -51,6 +51,10 @@ func (f *FakeSubscriptionWriter) Complete() { f.messageCountOnComplete = len(f.writtenMessages) } +func (f *FakeSubscriptionWriter) Heartbeat() error { + return nil +} + func (f *FakeSubscriptionWriter) Close(SubscriptionCloseKind) { f.mu.Lock() defer f.mu.Unlock() diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 67b83ab20..e908b68a6 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -4777,6 +4777,10 @@ func (s *SubscriptionRecorder) Complete() { s.complete.Store(true) } +func (s *SubscriptionRecorder) Heartbeat() error { + return nil +} + func (s *SubscriptionRecorder) Close(_ SubscriptionCloseKind) { s.closed.Store(true) } From b6522eb89f39048b0dcd4405e2bc449bbd075ee2 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 14 Aug 2025 17:46:44 +0100 Subject: [PATCH 4/7] Rename multipart-specific field to be more general --- v2/pkg/engine/resolve/event_loop_test.go | 20 ++++++++++---------- v2/pkg/engine/resolve/resolve.go | 10 +++++----- v2/pkg/engine/resolve/resolve_test.go | 12 ++++++------ 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/v2/pkg/engine/resolve/event_loop_test.go b/v2/pkg/engine/resolve/event_loop_test.go index 206442a18..674636800 100644 --- a/v2/pkg/engine/resolve/event_loop_test.go +++ b/v2/pkg/engine/resolve/event_loop_test.go @@ -119,16 +119,16 @@ func TestEventLoop(t *testing.T) { testReporter := &TestReporter{} resolver := New(resolverCtx, ResolverOptions{ - MaxConcurrency: 1024, - Debug: false, - AsyncErrorWriter: ew, - PropagateSubgraphErrors: false, - PropagateSubgraphStatusCodes: false, - SubgraphErrorPropagationMode: SubgraphErrorPropagationModePassThrough, - DefaultErrorExtensionCode: "TEST", - MaxRecyclableParserSize: 1024 * 1024, - MultipartSubHeartbeatInterval: DefaultHeartbeatInterval, - Reporter: testReporter, + MaxConcurrency: 1024, + Debug: false, + AsyncErrorWriter: ew, + PropagateSubgraphErrors: false, + PropagateSubgraphStatusCodes: false, + SubgraphErrorPropagationMode: SubgraphErrorPropagationModePassThrough, + DefaultErrorExtensionCode: "TEST", + MaxRecyclableParserSize: 1024 * 1024, + SubHeartbeatInterval: DefaultHeartbeatInterval, + Reporter: testReporter, }) subscription := &GraphQLSubscription{ diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 8e94813ff..98d32375e 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -139,8 +139,8 @@ type ResolverOptions struct { ResolvableOptions ResolvableOptions // AllowedCustomSubgraphErrorFields defines which fields are allowed in the subgraph error when in passthrough mode AllowedSubgraphErrorFields []string - // MultipartSubHeartbeatInterval defines the interval in which a heartbeat is sent to all multipart subscriptions - MultipartSubHeartbeatInterval time.Duration + // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer) + SubHeartbeatInterval time.Duration // MaxSubscriptionFetchTimeout defines the maximum time a subscription fetch can take before it is considered timed out MaxSubscriptionFetchTimeout time.Duration // ApolloRouterCompatibilitySubrequestHTTPError is a compatibility flag for Apollo Router, it is used to handle HTTP errors in subrequests differently @@ -154,8 +154,8 @@ func New(ctx context.Context, options ResolverOptions) *Resolver { options.MaxConcurrency = 32 } - if options.MultipartSubHeartbeatInterval <= 0 { - options.MultipartSubHeartbeatInterval = DefaultHeartbeatInterval + if options.SubHeartbeatInterval <= 0 { + options.SubHeartbeatInterval = DefaultHeartbeatInterval } // We transform the allowed fields into a map for faster lookups @@ -198,7 +198,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver { triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)), allowedErrorExtensionFields: allowedExtensionFields, allowedErrorFields: allowedErrorFields, - heartbeatInterval: options.MultipartSubHeartbeatInterval, + heartbeatInterval: options.SubHeartbeatInterval, maxSubscriptionFetchTimeout: options.MaxSubscriptionFetchTimeout, } resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency) diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index e908b68a6..100b052e4 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -90,12 +90,12 @@ var multipartSubHeartbeatInterval = 100 * time.Millisecond func newResolver(ctx context.Context) *Resolver { return New(ctx, ResolverOptions{ - MaxConcurrency: 1024, - Debug: false, - PropagateSubgraphErrors: true, - PropagateSubgraphStatusCodes: true, - AsyncErrorWriter: &TestErrorWriter{}, - MultipartSubHeartbeatInterval: multipartSubHeartbeatInterval, + MaxConcurrency: 1024, + Debug: false, + PropagateSubgraphErrors: true, + PropagateSubgraphStatusCodes: true, + AsyncErrorWriter: &TestErrorWriter{}, + SubHeartbeatInterval: multipartSubHeartbeatInterval, }) } From 73c50f5fd162dd0b53eddbab13894b6c5f4b56d0 Mon Sep 17 00:00:00 2001 From: endigma Date: Thu, 14 Aug 2025 17:49:21 +0100 Subject: [PATCH 5/7] Add heartbeat implementation to test writer --- v2/pkg/engine/resolve/event_loop_test.go | 4 ++++ v2/pkg/engine/resolve/resolve_test.go | 3 +++ 2 files changed, 7 insertions(+) diff --git a/v2/pkg/engine/resolve/event_loop_test.go b/v2/pkg/engine/resolve/event_loop_test.go index 674636800..a19ba9e7f 100644 --- a/v2/pkg/engine/resolve/event_loop_test.go +++ b/v2/pkg/engine/resolve/event_loop_test.go @@ -51,7 +51,11 @@ func (f *FakeSubscriptionWriter) Complete() { f.messageCountOnComplete = len(f.writtenMessages) } +// Heartbeat writes directly to the writtenMessages slice, as the real implementations implicitly flush func (f *FakeSubscriptionWriter) Heartbeat() error { + f.mu.Lock() + defer f.mu.Unlock() + f.writtenMessages = append(f.writtenMessages, string("heartbeat")) return nil } diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 100b052e4..fe8b46610 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -4778,6 +4778,9 @@ func (s *SubscriptionRecorder) Complete() { } func (s *SubscriptionRecorder) Heartbeat() error { + s.mux.Lock() + defer s.mux.Unlock() + s.messages = append(s.messages, "heartbeat") return nil } From c461a8a58ce760a417e42ec255485a2a3b2d0bf6 Mon Sep 17 00:00:00 2001 From: endigma Date: Mon, 18 Aug 2025 10:34:50 +0100 Subject: [PATCH 6/7] Update subscription heartbeat handling and logging --- v2/pkg/engine/resolve/resolve.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 98d32375e..347c253b4 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -65,7 +65,7 @@ type Resolver struct { propagateSubgraphErrors bool propagateSubgraphStatusCodes bool - // Multipart heartbeat interval + // Subscription heartbeat interval heartbeatInterval time.Duration // maxSubscriptionFetchTimeout defines the maximum time a subscription fetch can take before it is considered timed out maxSubscriptionFetchTimeout time.Duration @@ -306,8 +306,8 @@ func (s *sub) startWorker() { s.startWorkerWithoutHeartbeat() } -// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when -// subscription over multipart is used. It sends a heartbeat to the client every heartbeatInterval. +// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when enabled. +// It sends a heartbeat to the client every heartbeatInterval. Heartbeats are handled by the SubscriptionResponseWriter interface. // TODO: Implement a shared timer implementation to avoid creating a new ticker for each subscription. func (s *sub) startWorkerWithHeartbeat() { heartbeatTicker := time.NewTicker(s.resolver.heartbeatInterval) @@ -515,16 +515,13 @@ func (r *Resolver) handleHeartbeat(sub *sub) { } if err := sub.writer.Heartbeat(); err != nil { - if errors.Is(err, context.Canceled) { - // If Write fails (e.g. client disconnected), remove the subscription. - _ = r.AsyncUnsubscribeSubscription(sub.id) - return - } - r.asyncErrorWriter.WriteError(sub.ctx, err, nil, sub.writer) + // If heartbeat fails (e.g. client disconnected), remove the subscription. + _ = r.AsyncUnsubscribeSubscription(sub.id) + return } if r.options.Debug { - fmt.Printf("resolver:heartbeat:subscription:flushed:%d\n", sub.id.SubscriptionID) + fmt.Printf("resolver:heartbeat:subscription:done:%d\n", sub.id.SubscriptionID) } if r.reporter != nil { From fadc6bc858a1f3f29359d96a9f9b034139ceb84e Mon Sep 17 00:00:00 2001 From: endigma Date: Fri, 22 Aug 2025 10:09:44 +0100 Subject: [PATCH 7/7] Rename SubHeartbeatInterval to SubscriptionHeartbeatInterval --- v2/pkg/engine/resolve/event_loop_test.go | 20 ++++++++++---------- v2/pkg/engine/resolve/resolve.go | 10 +++++----- v2/pkg/engine/resolve/resolve_test.go | 14 +++++++------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/v2/pkg/engine/resolve/event_loop_test.go b/v2/pkg/engine/resolve/event_loop_test.go index a19ba9e7f..11389630a 100644 --- a/v2/pkg/engine/resolve/event_loop_test.go +++ b/v2/pkg/engine/resolve/event_loop_test.go @@ -123,16 +123,16 @@ func TestEventLoop(t *testing.T) { testReporter := &TestReporter{} resolver := New(resolverCtx, ResolverOptions{ - MaxConcurrency: 1024, - Debug: false, - AsyncErrorWriter: ew, - PropagateSubgraphErrors: false, - PropagateSubgraphStatusCodes: false, - SubgraphErrorPropagationMode: SubgraphErrorPropagationModePassThrough, - DefaultErrorExtensionCode: "TEST", - MaxRecyclableParserSize: 1024 * 1024, - SubHeartbeatInterval: DefaultHeartbeatInterval, - Reporter: testReporter, + MaxConcurrency: 1024, + Debug: false, + AsyncErrorWriter: ew, + PropagateSubgraphErrors: false, + PropagateSubgraphStatusCodes: false, + SubgraphErrorPropagationMode: SubgraphErrorPropagationModePassThrough, + DefaultErrorExtensionCode: "TEST", + MaxRecyclableParserSize: 1024 * 1024, + SubscriptionHeartbeatInterval: DefaultHeartbeatInterval, + Reporter: testReporter, }) subscription := &GraphQLSubscription{ diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 347c253b4..20a606fce 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -139,8 +139,8 @@ type ResolverOptions struct { ResolvableOptions ResolvableOptions // AllowedCustomSubgraphErrorFields defines which fields are allowed in the subgraph error when in passthrough mode AllowedSubgraphErrorFields []string - // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer) - SubHeartbeatInterval time.Duration + // SubscriptionHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer) + SubscriptionHeartbeatInterval time.Duration // MaxSubscriptionFetchTimeout defines the maximum time a subscription fetch can take before it is considered timed out MaxSubscriptionFetchTimeout time.Duration // ApolloRouterCompatibilitySubrequestHTTPError is a compatibility flag for Apollo Router, it is used to handle HTTP errors in subrequests differently @@ -154,8 +154,8 @@ func New(ctx context.Context, options ResolverOptions) *Resolver { options.MaxConcurrency = 32 } - if options.SubHeartbeatInterval <= 0 { - options.SubHeartbeatInterval = DefaultHeartbeatInterval + if options.SubscriptionHeartbeatInterval <= 0 { + options.SubscriptionHeartbeatInterval = DefaultHeartbeatInterval } // We transform the allowed fields into a map for faster lookups @@ -198,7 +198,7 @@ func New(ctx context.Context, options ResolverOptions) *Resolver { triggerUpdateBuf: bytes.NewBuffer(make([]byte, 0, 1024)), allowedErrorExtensionFields: allowedExtensionFields, allowedErrorFields: allowedErrorFields, - heartbeatInterval: options.SubHeartbeatInterval, + heartbeatInterval: options.SubscriptionHeartbeatInterval, maxSubscriptionFetchTimeout: options.MaxSubscriptionFetchTimeout, } resolver.maxConcurrency = make(chan struct{}, options.MaxConcurrency) diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index fe8b46610..cc0d16271 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -86,16 +86,16 @@ func (t *TestErrorWriter) WriteError(ctx *Context, err error, res *GraphQLRespon } } -var multipartSubHeartbeatInterval = 100 * time.Millisecond +var subscriptionHeartbeatInterval = 100 * time.Millisecond func newResolver(ctx context.Context) *Resolver { return New(ctx, ResolverOptions{ - MaxConcurrency: 1024, - Debug: false, - PropagateSubgraphErrors: true, - PropagateSubgraphStatusCodes: true, - AsyncErrorWriter: &TestErrorWriter{}, - SubHeartbeatInterval: multipartSubHeartbeatInterval, + MaxConcurrency: 1024, + Debug: false, + PropagateSubgraphErrors: true, + PropagateSubgraphStatusCodes: true, + AsyncErrorWriter: &TestErrorWriter{}, + SubscriptionHeartbeatInterval: subscriptionHeartbeatInterval, }) }