Skip to content

dq/runtime/output_channel: relax requirement on double watermark/checkpoint #21459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions ydb/library/yql/dq/runtime/dq_output_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ class TDqOutputChannel : public IDqOutputChannel {
}

EDqFillLevel GetFillLevel() const override {
if (Checkpoints) {
// prevent adding data into channel that already contains untransferred checkpoint (we can tolerate multiple checkpoints without any data between)
return EDqFillLevel::HardLimit;
}
return FillLevel;
}

Expand Down Expand Up @@ -226,13 +230,13 @@ class TDqOutputChannel : public IDqOutputChannel {
}

void Push(NDqProto::TWatermark&& watermark) override {
YQL_ENSURE(!Watermark);
// if there were already watermark in-fly replace it with latest one
YQL_ENSURE(!Watermark || Watermark->GetTimestampUs() <= watermark.GetTimestampUs());
Watermark.ConstructInPlace(std::move(watermark));
}

void Push(NDqProto::TCheckpoint&& checkpoint) override {
YQL_ENSURE(!Checkpoint);
Checkpoint.ConstructInPlace(std::move(checkpoint));
Checkpoints.push_back(std::move(checkpoint));
}

[[nodiscard]]
Expand Down Expand Up @@ -304,9 +308,9 @@ class TDqOutputChannel : public IDqOutputChannel {

[[nodiscard]]
bool Pop(NDqProto::TCheckpoint& checkpoint) override {
if (!HasData() && Checkpoint) {
checkpoint = std::move(*Checkpoint);
Checkpoint = Nothing();
if (!HasData() && Checkpoints) {
checkpoint = std::move(Checkpoints.front());
Checkpoints.pop_front();
return true;
}
return false;
Expand Down Expand Up @@ -478,7 +482,7 @@ class TDqOutputChannel : public IDqOutputChannel {
bool Finished = false;

TMaybe<NDqProto::TWatermark> Watermark;
TMaybe<NDqProto::TCheckpoint> Checkpoint;
TDeque<NDqProto::TCheckpoint> Checkpoints;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 чекпойнта возможно только если inflight в конфиге больше 1 ? Сейчас же только одни чекпойнт одновременно ходит по графу.

Copy link
Collaborator Author

@yumkam yumkam Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Есть edge case: в кверях с записью не в топик, результат записывается в канал.
А новый checkpoint запускается после того, как checkpoint прошёл по всем таскам и все sink отчитались от записи.
В данном случае синков нет, последняя таска записала в канал, если канал при этом почему-то застрял -- в него может прилететь второй checkpoint.
Городить ради этого TDeque немножко жалко, но[...]

std::shared_ptr<TDqFillAggregator> Aggregator;
EDqFillLevel FillLevel = NoLimit;
};
Expand Down
Loading