Skip to content

chore: parallel kafka consumer #17262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open

Conversation

fcjack
Copy link
Contributor

@fcjack fcjack commented Apr 16, 2025

What this PR does / why we need it:

Update our kafka consumer to process records in parallel and publish the highest offset received from the block.

Which issue(s) this PR fixes:
Fixes https://github.com/grafana/loki-private/issues/1547

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@fcjack fcjack self-assigned this Apr 16, 2025
@fcjack fcjack force-pushed the fcjack/kafka-parallel-consumer branch from 46c394b to 9d1ff75 Compare April 16, 2025 09:47
@pull-request-size pull-request-size bot added size/L and removed size/M labels Apr 16, 2025
@github-actions github-actions bot added the type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories label Apr 16, 2025
Copy link
Contributor

github-actions bot commented Apr 16, 2025

@fcjack fcjack marked this pull request as ready for review April 16, 2025 14:26
@fcjack fcjack requested a review from a team as a code owner April 16, 2025 14:26
// success keeps track of the records that were processed. It is expected to
// be sorted in ascending order of offset since the records themselves are
// ordered.
success := make([]int64, len(records))
Copy link
Contributor

@grobinson-grafana grobinson-grafana Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one thing we just need to be careful of, and I didn't consider this at the start, is that we don't set a limit on the maximum number of records that we can poll from Kafka. I would suggest testing this in dev (and also putting a custom image in ops with flux-ignore) to test this. In most cases, this slice should be less than 10KB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was planning to test on dev already before we move forward.
I will consider take a look on ops too, thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this will be fine considering an additional int per record is likely significantly smaller than the records themselves. Good to be aware of, but I doubt we'll see this in profiles at all.

Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one nit, then lgtm


// Find the highest offset before a gap, and commit that.
var highestOffset int64
for _, offset := range success {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you might want to make success []*int to handle the case where offset == 0, which iirc is still valid for the first record in a partition.

// success keeps track of the records that were processed. It is expected to
// be sorted in ascending order of offset since the records themselves are
// ordered.
success := make([]int64, len(records))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this will be fine considering an additional int per record is likely significantly smaller than the records themselves. Good to be aware of, but I doubt we'll see this in profiles at all.

@fcjack fcjack force-pushed the fcjack/kafka-parallel-consumer branch from e5528af to e6b2320 Compare May 9, 2025 09:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/L type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants