Skip to content

feat: improved subscription heartbeats #1269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

endigma
Copy link
Member

@endigma endigma commented Aug 12, 2025

Summary by CodeRabbit

  • Bug Fixes

    • Heartbeats now stop promptly on client disconnect or shutdown, reducing spurious errors.
  • Performance

    • Heartbeat handling streamlined to a single writer call, lowering overhead and improving idle stability.
  • Refactor

    • Unified and renamed subscription heartbeat configuration for clearer setup and consistent behavior.
  • Tests

    • Subscription tests updated to record and verify heartbeat events.

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.

Copy link

coderabbitai bot commented Aug 12, 2025

Walkthrough

Adds a Heartbeat() error method to subscription writers and implementations, renames ResolverOptions.MultipartSubHeartbeatInterval to SubscriptionHeartbeatInterval, and updates resolver heartbeat flow to call writer.Heartbeat() with early-cancellation checks and ticker reset instead of writing multipart payloads.

Changes

Cohort / File(s) Summary of changes
Execution result writer
execution/graphql/result_writer.go
Added func (e *EngineResultWriter) Heartbeat() error (no-op); Complete/Close unchanged.
Subscription response API
v2/pkg/engine/resolve/response.go
Added Heartbeat() error to the SubscriptionResponseWriter interface.
Resolver heartbeat logic
v2/pkg/engine/resolve/resolve.go
Removed multipart heartbeat payload constant; renamed ResolverOptions.MultipartSubHeartbeatIntervalResolverOptions.SubscriptionHeartbeatInterval and defaulted to DefaultHeartbeatInterval; initialization now uses SubscriptionHeartbeatInterval; startWorkerWithHeartbeat and handleHeartbeat updated to use writer.Heartbeat() (signature changed to handleHeartbeat(sub *sub)), added early-cancellation checks, ticker stop/reset, unsubscribe on writer error, and adjusted logging.
Tests — fake writers & recorders
v2/pkg/engine/resolve/event_loop_test.go, v2/pkg/engine/resolve/resolve_test.go
Added Heartbeat() error to FakeSubscriptionWriter and SubscriptionRecorder; updated tests to use ResolverOptions.SubscriptionHeartbeatInterval instead of MultipartSubHeartbeatInterval.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch jesse/eng-7547-heartbeat-on-sse

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@endigma endigma marked this pull request as draft August 12, 2025 14:23
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (2)
v2/pkg/engine/resolve/response.go (1)

58-64: Clarify Heartbeat() contract (flush semantics, error signaling, concurrency).

Right now it's unclear whether Heartbeat() must also flush, and which error to return on client disconnect. Given the new usage path depends on Heartbeat() to replace Write+Flush, please document and standardize:

  • Implementations must write and flush a protocol-specific heartbeat immediately.
  • On client disconnect, return an error for which errors.Is(err, context.Canceled) is true (or broaden resolver handling; see related comment).
  • For protocols without heartbeats, it may be a no-op returning nil.
  • Must be called from the writer goroutine only.

Apply this diff to document expectations:

 type SubscriptionResponseWriter interface {
   ResponseWriter
   Flush() error
   Complete()
-  Heartbeat() error
+  // Heartbeat sends a protocol-specific heartbeat frame to keep the connection alive.
+  // Implementations should:
+  //  - write the heartbeat frame and flush it immediately,
+  //  - return an error comparable with context.Canceled (errors.Is(err, context.Canceled) == true)
+  //    when the client connection is gone,
+  //  - return nil for protocols where heartbeats are not applicable (no-op).
+  // Note: Heartbeat is invoked on the writer goroutine and must not block for long.
+  Heartbeat() error
   Close(kind SubscriptionCloseKind)
 }
v2/pkg/engine/resolve/resolve.go (1)

499-525: Improve heartbeat error handling in handleHeartbeat

  • Unsubscribe on common network‐closure errors, not just context.Canceled. For example: io.EOF, syscall.EPIPE, net.ErrClosed, etc.
  • Avoid calling WriteError with a nil *GraphQLResponse (current test writers ignore it, but future implementations may dereference it). If you still want to report heartbeat errors, pass sub.resolve.Response when non-nil.
  • In many cases it’s better to simply unsubscribe and stop heartbeating rather than attempting to send an error over a broken connection.

Locations to update:

  • v2/pkg/engine/resolve/resolve.go → func (*Resolver) handleHeartbeat (around lines 499–525)

Suggested diff sketch:

 func (r *Resolver) handleHeartbeat(sub *sub) {
     …
     if err := sub.writer.Heartbeat(); err != nil {
-        if errors.Is(err, context.Canceled) {
+        // Treat common disconnect signals as end-of-subscription
+        if errors.Is(err, context.Canceled) ||
+           errors.Is(err, io.EOF) ||
+           errors.Is(err, syscall.EPIPE) /* add other errno errors as needed */ {
             _ = r.AsyncUnsubscribeSubscription(sub.id)
             return
         }
-        r.asyncErrorWriter.WriteError(sub.ctx, err, nil, sub.writer)
+        // Optionally report the error if you really need it—pass a non-nil response to avoid panics:
+        if sub.resolve != nil && sub.resolve.Response != nil {
+            r.asyncErrorWriter.WriteError(sub.ctx, err, sub.resolve.Response, sub.writer)
+        }
     }
 }
🧹 Nitpick comments (4)
execution/graphql/result_writer.go (1)

38-40: Document the no-op Heartbeat to avoid future misuse.

This satisfies the interface but makes heartbeats a silent no-op for this writer. If EngineResultWriter is never used for streaming subscriptions that require heartbeats, this is fine; otherwise it will appear as if heartbeats are being sent when they aren’t.

Add a short comment to clarify intent:

 func (e *EngineResultWriter) Heartbeat() error {
+	// No-op: EngineResultWriter is a buffer-based writer and does not stream heartbeats.
+	// Protocol-specific writers (e.g. multipart, SSE, WS) must implement the actual heartbeat behavior.
 	return nil
 }
v2/pkg/engine/resolve/resolve.go (3)

526-529: Misleading debug message: “flushed” though no explicit flush here.

Since flush responsibility moved into writer.Heartbeat(), this log line can mislead when an implementation is a no-op. Consider renaming to “sent” or logging only after Heartbeat returns nil for implementations that actually flushed.

-	fmt.Printf("resolver:heartbeat:subscription:flushed:%d\n", sub.id.SubscriptionID)
+	fmt.Printf("resolver:heartbeat:subscription:sent:%d\n", sub.id.SubscriptionID)

530-533: Heartbeats counted as “SubscriptionUpdateSent” may skew metrics.

If metrics are intended to reflect data updates, counting heartbeats will inflate numbers. Consider a dedicated metric (e.g., SubscriptionHeartbeatSent) or skip reporter calls for heartbeats.

I can refactor Reporter to include a separate heartbeat counter and update call sites—want me to draft that?


312-341: Timer per subscription scalability: keep TODO visible and consider a shared scheduler.

Acknowledged in the TODO. For many concurrent subscriptions, a ticker per sub is costly. A shared timer wheel or sharded scheduler can reduce overhead substantially.

Happy to draft a sharded ticker utility that feeds interested subs at interval ticks.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e473aa7 and 3c64a87.

📒 Files selected for processing (3)
  • execution/graphql/result_writer.go (1 hunks)
  • v2/pkg/engine/resolve/resolve.go (3 hunks)
  • v2/pkg/engine/resolve/response.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
🔇 Additional comments (2)
v2/pkg/engine/resolve/resolve.go (2)

329-339: LGTM: updated call site matches new signature.

Switching to handleHeartbeat(s) aligns with the new writer-driven heartbeat mechanism.


500-516: Early-cancellation checks are good; consider combining with the select to avoid redundant checks.

The select already listens to r.ctx.Done() and sub.ctx.Done() in the worker loop. These guards are fine, but redundant. Leaving as-is is acceptable due to negligible cost.

@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch 2 times, most recently from 0f9f721 to bf139fd Compare August 14, 2025 16:29
@endigma endigma marked this pull request as ready for review August 14, 2025 16:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
v2/pkg/engine/resolve/resolve_test.go (1)

4780-4782: Optional: instrument heartbeats for assertions

If you want to assert that heartbeats were triggered without affecting payloads, increment a counter inside Heartbeat():

 func (s *SubscriptionRecorder) Heartbeat() error {
-	return nil
+	s.mux.Lock()
+	defer s.mux.Unlock()
+	// track heartbeats for test assertions
+	if s.onFlush != nil {
+		// keep behavior no-op for payloads
+	}
+	return nil
 }

Additions outside the selected range to support this (if desired):

// add to SubscriptionRecorder struct:
heartbeats int

// add getter for tests:
func (s *SubscriptionRecorder) HeartbeatCount() int {
	s.mux.Lock()
	defer s.mux.Unlock()
	return s.heartbeats
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these settings in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3c64a87 and bf139fd.

📒 Files selected for processing (5)
  • execution/graphql/result_writer.go (1 hunks)
  • v2/pkg/engine/resolve/event_loop_test.go (1 hunks)
  • v2/pkg/engine/resolve/resolve.go (3 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (1 hunks)
  • v2/pkg/engine/resolve/response.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • v2/pkg/engine/resolve/response.go
  • execution/graphql/result_writer.go
  • v2/pkg/engine/resolve/resolve.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
🔇 Additional comments (2)
v2/pkg/engine/resolve/resolve_test.go (1)

4780-4782: Add no-op Heartbeat to SubscriptionRecorder: LGTM

No-op Heartbeat() keeps tests from emitting heartbeat payloads while satisfying the updated SubscriptionResponseWriter interface. The method is concurrency-safe as implemented (does nothing) and aligns with tests that assert no heartbeats are written.

v2/pkg/engine/resolve/event_loop_test.go (1)

54-56: Add no-op Heartbeat to FakeSubscriptionWriter: LGTM

The no-op Heartbeat() maintains interface compliance and avoids introducing extra messages in tests. This is appropriate given the resolver heartbeat path now calls writer.Heartbeat() and tests don’t assert on heartbeat payloads.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
v2/pkg/engine/resolve/resolve.go (2)

201-202: Nit: field comment still says “Multipart heartbeat interval”

Now that the option is generalized, update the field comment for accuracy.

Outside this range, update the comment at the field declaration:

// Subscription heartbeat interval
heartbeatInterval time.Duration

309-339: Nit: outdated comment mentions “multipart” only

Heartbeat is now delegated to the writer. Tweak the comment accordingly.

-// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when
-// subscription over multipart is used. It sends a heartbeat to the client every heartbeatInterval.
+// startWorkerWithHeartbeat is similar to startWorker but sends heartbeats to the client when
+// the writer supports heartbeats. It sends a heartbeat to the client every heartbeatInterval.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these settings in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bf139fd and 097167a.

📒 Files selected for processing (3)
  • v2/pkg/engine/resolve/event_loop_test.go (2 hunks)
  • v2/pkg/engine/resolve/resolve.go (6 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • v2/pkg/engine/resolve/event_loop_test.go
  • v2/pkg/engine/resolve/resolve_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
🔇 Additional comments (2)
v2/pkg/engine/resolve/resolve.go (2)

505-511: LGTM: early-cancellation checks are in the right place and prevent unnecessary writes

The early returns on resolver and subscription contexts are correct and avoid unnecessary heartbeats on shutdown/client disconnect.


530-533: Confirm metric semantics: are heartbeats “updates”?

SubscriptionUpdateSent increments on heartbeats too. If this metric is used to count actual data updates, this will inflate it. Either:

  • use a dedicated HeartbeatSent metric, or
  • pass a parameter/flag to distinguish heartbeat vs. data.

Please confirm desired behavior.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
v2/pkg/engine/resolve/resolve.go (2)

142-144: Breaking API: keep deprecated alias for ResolverOptions to avoid downstream breaks

Renaming MultipartSubHeartbeatInterval to SubHeartbeatInterval is a breaking change for external callers. Keep the old exported field as deprecated and document the migration.

Apply this diff to restore backward compatibility:

-   // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
-   SubHeartbeatInterval time.Duration
+   // Deprecated: use SubHeartbeatInterval. Kept for backward compatibility with older integrations.
+   MultipartSubHeartbeatInterval time.Duration
+   // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
+   SubHeartbeatInterval time.Duration

157-159: Honor deprecated MultipartSubHeartbeatInterval when defaulting

If users still set the deprecated field, use it before falling back to DefaultHeartbeatInterval.

Apply this diff:

-   if options.SubHeartbeatInterval <= 0 {
-       options.SubHeartbeatInterval = DefaultHeartbeatInterval
-   }
+   if options.SubHeartbeatInterval <= 0 {
+       if options.MultipartSubHeartbeatInterval > 0 {
+           options.SubHeartbeatInterval = options.MultipartSubHeartbeatInterval
+       } else {
+           options.SubHeartbeatInterval = DefaultHeartbeatInterval
+       }
+   }
🧹 Nitpick comments (3)
v2/pkg/engine/resolve/resolve.go (3)

309-311: Heartbeat worker structure looks good; consider a one-shot timer to avoid tick backlogs

The design is clear and contained. Optional: replace the ticker with a time.Timer that you reset after each write/heartbeat to avoid accumulating ticks during long writes and to reduce resets per loop.


517-521: Differentiate common disconnects; optionally log non-disconnect errors once

Currently any Heartbeat() error triggers unsubscribe, which is acceptable for SSE. Optional: treat context.Canceled and io.EOF as expected disconnects and only log other errors once in debug mode for easier diagnosis.

Apply this diff:

-   if err := sub.writer.Heartbeat(); err != nil {
-       // If heartbeat fails (e.g. client disconnected), remove the subscription.
-       _ = r.AsyncUnsubscribeSubscription(sub.id)
-       return
-   }
+   if err := sub.writer.Heartbeat(); err != nil {
+       // Treat expected disconnects as terminal.
+       if err == context.Canceled || err == io.EOF {
+           _ = r.AsyncUnsubscribeSubscription(sub.id)
+           return
+       }
+       // For other errors, also unsubscribe but surface in debug logs.
+       if r.options.Debug {
+           fmt.Printf("resolver:heartbeat:subscription:error:%d:%v\n", sub.id.SubscriptionID, err)
+       }
+       _ = r.AsyncUnsubscribeSubscription(sub.id)
+       return
+   }

523-526: Nit: clarify debug text to reflect action

“done” is ambiguous; “sent” makes the intent clearer.

Apply this diff:

-   if r.options.Debug {
-       fmt.Printf("resolver:heartbeat:subscription:done:%d\n", sub.id.SubscriptionID)
-   }
+   if r.options.Debug {
+       fmt.Printf("resolver:heartbeat:subscription:sent:%d\n", sub.id.SubscriptionID)
+   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 097167a and 5a1ebf7.

📒 Files selected for processing (1)
  • v2/pkg/engine/resolve/resolve.go (8 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
🔇 Additional comments (5)
v2/pkg/engine/resolve/resolve.go (5)

68-72: LGTM: dedicated heartbeat interval on the resolver

The private field and placement are sensible; per-resolver configuration is appropriate.


201-203: LGTM: wire options into resolver state

Using options.SubHeartbeatInterval to initialize the resolver’s heartbeatInterval is correct.


329-339: LGTM: reset heartbeat after each write to avoid immediate heartbeats

Resetting the ticker after work keeps heartbeats strictly idle-time. Good choice.


500-516: LGTM: early-cancel checks before sending heartbeats

Short-circuiting on resolver/client cancellation prevents unnecessary writes.


527-529: Please review how heartbeats are emitted and whether they invoke SubscriptionUpdateSent. The reporter callback is wired to every send—including heartbeats—so heartbeat “pings” will increment your subscription‐update metric. If that is undesirable, you’ll need to filter out heartbeat sends (for example by checking the message payload or wrapping the report call) rather than unconditionally deleting the reporter invocation here.

@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch from 5a1ebf7 to 0827865 Compare August 20, 2025 09:07
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
v2/pkg/engine/resolve/resolve_test.go (1)

93-99: API rename reflected in tests; keep a deprecated alias to avoid breaking external callers

Renaming MultipartSubHeartbeatInterval to SubHeartbeatInterval is fine here, but it’s a breaking change for integrators using ResolverOptions. Recommend keeping a deprecated alias on ResolverOptions and honoring it during defaulting.

Apply in v2/pkg/engine/resolve/resolve.go (ResolverOptions):

+    // Deprecated: use SubHeartbeatInterval. Kept for backward compatibility with older integrations.
+    MultipartSubHeartbeatInterval time.Duration
     // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
     SubHeartbeatInterval time.Duration

And defaulting:

-    if options.SubHeartbeatInterval <= 0 {
-        options.SubHeartbeatInterval = DefaultHeartbeatInterval
-    }
+    if options.SubHeartbeatInterval <= 0 {
+        if options.MultipartSubHeartbeatInterval > 0 {
+            options.SubHeartbeatInterval = options.MultipartSubHeartbeatInterval
+        } else {
+            options.SubHeartbeatInterval = DefaultHeartbeatInterval
+        }
+    }
v2/pkg/engine/resolve/resolve.go (2)

142-144: Preserve backward compatibility for renamed option

Keeping only SubHeartbeatInterval breaks existing users. Retain the old exported field as deprecated and map it during initialization.

Apply:

 type ResolverOptions struct {
@@
-    // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
-    SubHeartbeatInterval time.Duration
+    // Deprecated: use SubHeartbeatInterval. Kept for backward compatibility with older integrations.
+    MultipartSubHeartbeatInterval time.Duration
+    // SubHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
+    SubHeartbeatInterval time.Duration

157-159: Honor the deprecated field during defaulting to prevent silent behavior changes

If users still set MultipartSubHeartbeatInterval, prefer that value before falling back to DefaultHeartbeatInterval.

- if options.SubHeartbeatInterval <= 0 {
-     options.SubHeartbeatInterval = DefaultHeartbeatInterval
- }
+ if options.SubHeartbeatInterval <= 0 {
+     if options.MultipartSubHeartbeatInterval > 0 {
+         options.SubHeartbeatInterval = options.MultipartSubHeartbeatInterval
+     } else {
+         options.SubHeartbeatInterval = DefaultHeartbeatInterval
+     }
+ }
🧹 Nitpick comments (3)
execution/graphql/result_writer.go (1)

38-40: No-op Heartbeat aligns with the new interface; consider documenting intent and returning a pointer from constructors

This satisfies resolve.SubscriptionResponseWriter without changing behavior. If EngineResultWriter is ever used on transports that expect an actual heartbeat frame, document explicitly that this writer does nothing on Heartbeat.

Optional: both constructors return a value while all methods have pointer receivers. Returning a pointer eliminates ambiguity and avoids accidental copies.

Outside the selected lines, consider:

// Prefer returning a pointer to satisfy interfaces without taking the address later.
func NewEngineResultWriter() *EngineResultWriter {
	return &EngineResultWriter{buf: &bytes.Buffer{}}
}

func NewEngineResultWriterFromBuffer(buf *bytes.Buffer) *EngineResultWriter {
	return &EngineResultWriter{buf: buf}
}
v2/pkg/engine/resolve/resolve_test.go (1)

4780-4785: Add an assertion to actually enforce “no heartbeats sent”

Heartbeat() appends the literal "heartbeat" into messages. The test “should successfully get result from upstream” states that we shouldn’t see heartbeats, but doesn’t assert it. Add an explicit negative assertion to catch regressions.

Outside the selected lines, in the test block around Lines 5126–5135 after reading messages:

assert.NotContains(t, messages, "heartbeat")
v2/pkg/engine/resolve/resolve.go (1)

499-530: Consider excluding heartbeats from “update sent” metrics

Currently, reporter.SubscriptionUpdateSent() increments for heartbeats as well as real updates. If this metric is intended to track client-visible payload updates, counting heartbeats will inflate it.

Do you intend to count heartbeats as “updates”? If not, remove the call in handleHeartbeat or introduce a dedicated Reporter hook (e.g., HeartbeatSent()).

Optionally, narrow error handling to treat common closure signals as disconnects (context.Canceled, io.EOF, io.ErrClosedPipe, net.ErrClosed) before unsubscribing, though unsubscribing on any error is acceptable.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 5a1ebf7 and 0827865.

📒 Files selected for processing (5)
  • execution/graphql/result_writer.go (1 hunks)
  • v2/pkg/engine/resolve/event_loop_test.go (2 hunks)
  • v2/pkg/engine/resolve/resolve.go (8 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (2 hunks)
  • v2/pkg/engine/resolve/response.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • v2/pkg/engine/resolve/response.go
  • v2/pkg/engine/resolve/event_loop_test.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
v2/pkg/engine/resolve/resolve_test.go (1)
v2/pkg/engine/resolve/resolve.go (1)
  • AsyncErrorWriter (42-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
🔇 Additional comments (3)
v2/pkg/engine/resolve/resolve.go (3)

68-70: Good: dedicated heartbeat interval on the resolver

Clear separation of concerns; no issues.


201-203: LGTM: resolver uses the configured heartbeat interval

Initialization correctly wires the chosen interval.


309-341: Heartbeat worker flow is sound

  • Dedicated ticker per subscription, stopped via defer.
  • Early returns on resolver/client cancellation.
  • Ticker reset after each write to avoid spurious heartbeats during activity.

No issues.

@@ -518,24 +514,16 @@ func (r *Resolver) handleHeartbeat(sub *sub, data []byte) {
fmt.Printf("resolver:heartbeat:subscription:%d\n", sub.id.SubscriptionID)
}

if _, err := sub.writer.Write(data); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach to move it to the router 👍

if err != nil {
// If flush fails (e.g. client disconnected), remove the subscription.
if err := sub.writer.Heartbeat(); err != nil {
// If heartbeat fails (e.g. client disconnected), remove the subscription.
_ = r.AsyncUnsubscribeSubscription(sub.id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question here is, should we really abort the whole subscription for a heartbeat write error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just retains the error from before, a write error is almost always EOF or context cancelled or etc, meaning the client is gone

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we unsubscribed only on client cancellation or flush error.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
v2/pkg/engine/resolve/resolve.go (2)

142-145: Breaking API: keep deprecated alias for external callers and document it

Renaming the exported field breaks consumers initializing ResolverOptions. Keep a deprecated alias and prefer the new field.

Apply this diff to preserve backward compatibility:

-	// SubscriptionHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
-	SubscriptionHeartbeatInterval time.Duration
+	// Deprecated: use SubscriptionHeartbeatInterval. Kept for backward compatibility with older integrations.
+	MultipartSubHeartbeatInterval time.Duration
+	// SubscriptionHeartbeatInterval defines the interval in which a heartbeat is sent to all subscriptions (whether or not this does anything is determined by the subscription response writer)
+	SubscriptionHeartbeatInterval time.Duration

157-159: Honor deprecated field during defaulting to avoid silent behavior changes

If legacy integrations still set MultipartSubHeartbeatInterval, prefer it before falling back to DefaultHeartbeatInterval.

-	if options.SubscriptionHeartbeatInterval <= 0 {
-		options.SubscriptionHeartbeatInterval = DefaultHeartbeatInterval
-	}
+	if options.SubscriptionHeartbeatInterval <= 0 {
+		if options.MultipartSubHeartbeatInterval > 0 {
+			options.SubscriptionHeartbeatInterval = options.MultipartSubHeartbeatInterval
+		} else {
+			options.SubscriptionHeartbeatInterval = DefaultHeartbeatInterval
+		}
+	}

Would you like me to add a unit test that asserts the deprecated field is honored when the new field is unset?

🧹 Nitpick comments (4)
v2/pkg/engine/resolve/resolve.go (4)

68-72: Nit: clarify this is an internal, derived interval

Minor doc clarity: make it explicit that this is the internal interval, derived from options during construction.

-	// Subscription heartbeat interval
+	// Internal subscription heartbeat interval (set from options during construction)
 	heartbeatInterval time.Duration

309-311: Nit: align comment with actual behavior (ticker resets after writes)

The worker resets the ticker after each data write to avoid immediate heartbeats. Capture that in the comment.

-// It sends a heartbeat to the client every heartbeatInterval. Heartbeats are handled by the SubscriptionResponseWriter interface.
+// Sends a heartbeat every heartbeatInterval and resets the interval after each data write
+// to avoid immediate heartbeats following updates. Heartbeats are handled by SubscriptionResponseWriter.

523-525: Nit: use ':sent' to better reflect the event

“done” is a little ambiguous in logs; “sent” reads clearer.

-		fmt.Printf("resolver:heartbeat:subscription:done:%d\n", sub.id.SubscriptionID)
+		fmt.Printf("resolver:heartbeat:subscription:sent:%d\n", sub.id.SubscriptionID)

527-529: Don’t treat heartbeats as “updates” in SubscriptionUpdateSent

I confirmed that in v2/pkg/engine/resolve/resolve.go, the handleHeartbeat method (around line 548) calls

if r.reporter != nil {
    r.reporter.SubscriptionUpdateSent()
}

which means every heartbeat is counted as an “update” in our metrics, inflating dashboard numbers.

To keep update metrics meaningful, I recommend one of the following optional refactors:

Option A (preferred): stop counting heartbeats as updates. Remove the SubscriptionUpdateSent() call from handleHeartbeat.

func (r *Resolver) handleHeartbeat(sub *sub) {
    …
    if r.options.Debug {
        fmt.Printf("resolver:heartbeat:subscription:done:%d\n", sub.id.SubscriptionID)
    }

-   if r.reporter != nil {
-       r.reporter.SubscriptionUpdateSent()
-   }
}

Option B: if you still want visibility into heartbeats but keep update-metrics clean, introduce a new hook via a type-asserted interface instead of changing Reporter:

// Define in the same package
type HeartbeatReporter interface {
    HeartbeatSent()
}

// In handleHeartbeat:
if r.reporter != nil {
    if hr, ok := r.reporter.(HeartbeatReporter); ok {
        hr.HeartbeatSent()
    }
}

This avoids breaking the existing Reporter interface and lets implementers opt into heartbeat metrics.

Impacted locations:

  • v2/pkg/engine/resolve/resolve.gohandleHeartbeat
  • Tests in v2/pkg/engine/resolve/event_loop_test.go may need adjustments if they assert on heartbeat metrics.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0827865 and 8c167f3.

📒 Files selected for processing (3)
  • v2/pkg/engine/resolve/event_loop_test.go (2 hunks)
  • v2/pkg/engine/resolve/resolve.go (8 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • v2/pkg/engine/resolve/resolve_test.go
  • v2/pkg/engine/resolve/event_loop_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
🔇 Additional comments (4)
v2/pkg/engine/resolve/resolve.go (4)

201-202: LGTM: resolver carries the configured heartbeat interval

Using the normalized options.SubscriptionHeartbeatInterval here is correct.


329-329: LGTM: heartbeats executed on the writer goroutine

Calling handleHeartbeat from the worker keeps writer access single-threaded.


499-516: LGTM: early-cancel checks avoid unnecessary work

Returning early on resolver/client context cancellation before touching the writer is correct.


517-521: Heartbeat write errors: unsubscribe is acceptable; verify writer semantics across implementations

Unsubscribing on any Heartbeat() error is a reasonable default given typical EOF or context-canceled cases. Please double-check that every SubscriptionResponseWriter implementation you’ve registered returns a non-nil error when its underlying transport is closed—otherwise this loop branch could spin indefinitely:

  • v2/pkg/engine/resolve/response.go (interface definition)
  • v2/pkg/engine/resolve/event_loop_test.go → FakeSubscriptionWriter.Heartbeat()
  • v2/pkg/engine/resolve/resolve_test.go → SubscriptionRecorder.Heartbeat()
  • execution/graphql/result_writer.go → EngineResultWriter.Heartbeat()

If any of these stubs or real writers swallow transport-closed errors (for example by always returning nil), please update them so that a closed connection surfaces as an error here.

@endigma endigma force-pushed the jesse/eng-7547-heartbeat-on-sse branch from 8c167f3 to fadc6bc Compare August 22, 2025 09:17
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
v2/pkg/engine/resolve/event_loop_test.go (1)

54-60: Remove redundant string conversion in Heartbeat().

The cast string("heartbeat") is unnecessary; the literal is already a string. Minor cleanup.

Apply this diff:

 func (f *FakeSubscriptionWriter) Heartbeat() error {
 	f.mu.Lock()
 	defer f.mu.Unlock()
-	f.writtenMessages = append(f.writtenMessages, string("heartbeat"))
+	f.writtenMessages = append(f.writtenMessages, "heartbeat")
 	return nil
 }

Optional: consider defining a shared const (e.g., heartbeatMsg = "heartbeat") across test writers to keep the sentinel consistent.

v2/pkg/engine/resolve/resolve_test.go (2)

89-89: Prefer const for the test heartbeat interval.

This value is immutable and used as configuration; define it as a const to avoid accidental mutation.

Apply this diff:

-var subscriptionHeartbeatInterval = 100 * time.Millisecond
+const subscriptionHeartbeatInterval = 100 * time.Millisecond

4780-4785: Heartbeat() test writer implementation is fine; consider two small hardening tweaks.

  • Optional guard: avoid appending after Complete() or Close() to reduce flakiness in racy tests.
  • Optional assertion: in tests where heartbeats must not appear (e.g., “should successfully get result from upstream”), add an explicit NotContains check for "heartbeat" to clearly encode intent.

Example guard (optional):

 func (s *SubscriptionRecorder) Heartbeat() error {
 	s.mux.Lock()
 	defer s.mux.Unlock()
+	if s.complete.Load() || s.closed.Load() {
+		return nil
+	}
 	s.messages = append(s.messages, "heartbeat")
 	return nil
 }

Example assertion addition in the “should successfully get result from upstream” test (outside the changed lines):

messages := recorder.Messages()
assert.NotContains(t, messages, "heartbeat")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8c167f3 and fadc6bc.

📒 Files selected for processing (5)
  • execution/graphql/result_writer.go (1 hunks)
  • v2/pkg/engine/resolve/event_loop_test.go (2 hunks)
  • v2/pkg/engine/resolve/resolve.go (8 hunks)
  • v2/pkg/engine/resolve/resolve_test.go (3 hunks)
  • v2/pkg/engine/resolve/response.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • v2/pkg/engine/resolve/response.go
  • execution/graphql/result_writer.go
  • v2/pkg/engine/resolve/resolve.go
🧰 Additional context used
🧬 Code graph analysis (1)
v2/pkg/engine/resolve/event_loop_test.go (1)
v2/pkg/engine/resolve/resolve.go (1)
  • DefaultHeartbeatInterval (21-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
  • GitHub Check: Build and test (go 1.23 / windows-latest)
  • GitHub Check: Build and test (go 1.23 / ubuntu-latest)
🔇 Additional comments (2)
v2/pkg/engine/resolve/event_loop_test.go (1)

134-134: Field rename to SubscriptionHeartbeatInterval looks correct.

The test wiring matches the ResolverOptions rename and uses DefaultHeartbeatInterval as expected.

v2/pkg/engine/resolve/resolve_test.go (1)

98-98: API rename applied consistently.

Using SubscriptionHeartbeatInterval in New(...) aligns with the public API change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants