Skip to content

[HUDI-9381] Async instant time generation of Flink writers #13285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 16, 2025

Conversation

danny0405
Copy link
Contributor

@danny0405 danny0405 commented May 11, 2025

Change Logs

The new instant time generation utilizes RPC request to coordinate creation of new instants. Each write task will send a RPC request to the coordinator for the instant time, the coordinator uses a global lock to guard the access from multiple tasks.

Now one checkpoint id corresponds to one instant.

Basic work flow:

  • write task send request: current ckp id
  • write task expected response: the instant time
  • coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}}

Note that the ckp id used in the request is the last known id instead of the current checkpoint, for task restored from a state, it is the restored checkpoint id, otherwise -1 for fresh new job.

Impact

none

Risk level (write none, low medium or high below)

low

Documentation Update

none

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label May 11, 2025
@danny0405 danny0405 force-pushed the HUDI-9381 branch 2 times, most recently from 1562d54 to f7ae41e Compare May 11, 2025 14:40
@danny0405 danny0405 added flink Issues related to flink transaction streaming labels May 12, 2025
@zhangyue19921010 zhangyue19921010 self-assigned this May 12, 2025
@cshuo
Copy link
Contributor

cshuo commented May 13, 2025

🚀 I'll take a look.

Copy link
Member

@voonhous voonhous left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a little difficult to review without first having a Spec or high level overview of how states between coordinator and operator are handled.

Can we take this time to document the old spec + new spec so it's easier to maintain this moving forward?

@@ -226,6 +197,7 @@ private void reloadWriteMetaState() throws Exception {
this.writeMetadataState.clear();
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.checkpointId(checkpointId)
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.bootstrap(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bootstrapEvents are used to signal coordinator that it can start a new instant.
Since we are removing this signalling mechanism, is this attribute still needed?

Or is this left behind for migration purposes?

Copy link
Contributor Author

@danny0405 danny0405 May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed for ckp_Id -> instant time mapping.

@@ -108,7 +105,7 @@ public BootstrapOperator(Configuration conf) {

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
lastInstantTime = this.ckpMetadata.lastPendingInstant();
lastInstantTime = StreamerUtil.getLastCompletedInstant(StreamerUtil.createMetaClient(this.conf));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this incur large FS scans for every checkpoint? This might cause rate limit/high fs pressure for jobs with high parallelism.

is it possible for us to use a correspondent and delegate the fsView like operation to the coodinator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can, just need to register the coordinator also for BootstrapOperator

// upgrade downgrade
this.writeClient.upgradeDowngrade(this.instant, this.metaClient);
}

private void handleBootstrapEvent(WriteMetadataEvent event) {
this.eventBuffer[event.getTaskID()] = event;
cleanLegacyEvent(event);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, if all sendBootstrapEvent calls are removed, no bootstrapEvents should be sent, in what scenarious will there be bootstrapEvents to clean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bootstrapEvent will also be sent during subtask restoring from state, see restoreWriteMetadata

@voonhous
Copy link
Member

voonhous commented May 13, 2025

IIUC, there are 6 scenarios: prior to this PR's changes

  1. Happy path 1: No failures
  2. Happy path 2: Empty Checkpoint (No Data from Any Operator)
    • Flink triggers a checkpoint chkId, but no data has arrived at any operator since the last one. All operators send WriteMetadataEvent with empty WriteStatus lists. allowCommitOnEmptyBatch is false.
  3. Edge case 1: Operator Restarts (Bootstrap Path) where:
    • A specific operator task fails and is restarted by Flink. The Coordinator and other operators remain running. The restarted operator needs to sync up.
  4. Edge case 2: Checkpoint Fails Globally After Operator Sent Metadata where:
    • Operator snapshots successfully, sends WriteMetadataEvent for chkId and InstX. Flink begins committing the checkpoint chkId, but it fails globally (e.g., another operator fails, JM fails).
    • Flink notifies Coordinator of failure (implicitly via lack of notifyCheckpointComplete). Flink triggers a rollback/restart. Operator restarts, restores state (including the sent WriteMetadataEvent), checks timeline, and potentially re-sends the event if InstX is still pending.
  5. Edge case 3: Scenario 3: Commit Fails in Coordinator where:
    • Coordinator receives all WriteMetadataEvents for chkId, Flink calls notifyCheckpointComplete(chkId). Coordinator attempts writeClient.commit(InstX) but it fails
    • Coordinator catches the exception, attempts rollback (if possible), throw error, fails the Flink job via context.failJob(). Operators remain in confirming=true? state until the job restarts.
  6. Edge case 4: Commit Acknowledgment (ACK) Lost where:
    • Coordinator successfully commits InstX, sends CommitAckEvent to Operator N, but the network fails or the Operator processes it slowly and Flink triggers the next checkpoint (chkId+1) before the Operator unblocks. Operator proceeds, and does an empty commit iirc
    • Operator remains confirming=true. When the next checkpoint chkId+1 triggers snapshotState, the Operator calls instantToWrite(). This method checks lastPendingInstant() from CkpMetadata. If it sees a new instant (InstY) is now pending (meaning InstX was committed), it sets confirming=false even without the ACK and proceeds with InstY

Including this as a reference to ensure that this change will cover all of them. I may have missed out a few edge cases

@@ -111,9 +111,6 @@ public static class Config implements Serializable {
@Parameter(names = {"--enable-marker-requests", "-em"}, description = "Enable handling of marker-related requests")
public boolean enableMarkerRequests = false;

@Parameter(names = {"--enable-instant-state-requests"}, description = "Enable handling of instant state requests")
public boolean enableInstantStateRequests = false;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also remove instantStateForceRefreshRequestNumber?

@@ -254,35 +219,6 @@ protected String lastPendingInstant() {
* @return The instant time
*/
protected String instantToWrite(boolean hasData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argument hasData is not needed anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, but I want to keep it in case it may be used in the future.

}

protected void sendBootstrapEvent() {
this.eventGateway.sendEventToCoordinator(WriteMetadataEvent.emptyBootstrap(taskID));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emptyBootstrap can be also removed in WriteMetadataEvent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need it to clean the legacy events.

sendCommitAckEvents(-1L);
return;
}
// start to recommit the instant.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For partial failover, this condition will not be satisfied, will the event buffer be committed later? e.g., during next ckp complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will be recommitted later.

Copy link
Contributor

@zhangyue19921010 zhangyue19921010 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review up to BucketStreamWriteFunctionWrapper.java left some comments :)

@@ -350,6 +350,7 @@ private boolean flushBucket(RowDataBucket bucket) {
final List<WriteStatus> writeStatus = writeRecords(instant, bucket);
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.checkpointId(this.checkpointId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.checkpointId -> checkpointId Unify coding style

Pair<String, WriteMetadataEvent[]> instantTimeAndEventBuffer = this.eventBuffers.get(checkpointId);
final String instantTime;
if (instantTimeAndEventBuffer == null) {
synchronized (this.eventBuffers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronization on a non-final field 'this.eventBuffers' , if the reference of the variable has changed, causing synchronized to not take effect.

Although the eventBuffers reference will not change in the current implementation, it is still recommended to use a final variable to avoid the risk of missing this in subsequent feature iterations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can not make it final because I don't want to serialize it in the job graph.

this.eventBuffer[event.getTaskID()].mergeWith(event);
private WriteMetadataEvent[] addEventToBuffer(WriteMetadataEvent event) {
WriteMetadataEvent[] eventBuffer = this.eventBuffers.get(event.getCheckpointId()).getRight();
if (eventBuffer[event.getTaskID()] != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to lock eventBuffers here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, the executor is a single thread execution mode.

} else {
LOG.info("Recommit instant {}", instant);
// Recommit should start heartbeat for lazy failed writes clean policy to avoid aborting for heartbeat expired.
// Recommit should start heartbeat for lazy failed writes clean policy to avoid aborting for heartbeat expired;
// The following up checkpoints would recommit the instant.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeClient.getHeartbeatClient().start(instant); can not update heart beat directly. Because there is instantToHeartbeatMap cache in HoodieHeartBeatClient which may return directly instead of update heart beat time.

So that we need to

      if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
        if (writeClient.getHeartbeatClient().getHeartbeat(instant) != null) {
          LOG.info("Removing heartbeat for instant " + instant);
          writeClient.getHeartbeatClient().stop(instant);
          writeClient.getHeartbeatClient().reomveHeartbeat(instant);
        }
        writeClient.getHeartbeatClient().start(instant);
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is instantToHeartbeatMap cache in HoodieHeartBeatClient which may return directly instead of update heart beat time.

It's okay if the instant heartbeat timer is already there? BTW, this is legacy logic and I do not change it in this PR.


private void cleanLegacyEvent(WriteMetadataEvent event) {
this.eventBuffers.entrySet().stream()
.filter(entry -> entry.getKey().compareTo(event.getCheckpointId()) > 0)
Copy link
Contributor

@zhangyue19921010 zhangyue19921010 May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event.getCheckpointId()) > 0 ? I understand that this is to clean up all the remaining buffers before the current event. Maybe we need to clean up smaller checkpointID ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smaller checkpointID should already be taken care be the coordinator, the following-up checkpoint ack events would take care it anyway if it is still there.

private boolean commitInstants(long checkpointId) {
// use < instead of <= because the write metadata event sends the last known checkpoint id which is smaller than the current one.
List<Boolean> result = this.eventBuffers.entrySet().stream().filter(entry -> entry.getKey() < checkpointId)
.map(entry -> commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a one-time commit of all instants before. Here need to ensure that all historical commits are complete and intact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the cleaing of bootstrap event and the strategy for commiting empty instant.

@@ -38,6 +38,7 @@ public class WriteMetadataEvent implements OperatorEvent {

private List<WriteStatus> writeStatuses;
private int taskID;
private long checkpointId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upgrading existing Flink ingestion job with the current PR may cause compatibility issues, such as state restore failure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it looks like there is no good way to make it compatible? If the recovered checkpoint id is -1, it still works for most of the cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For WriteMetaDataEvent state, it uses PojoSerializer as state serializer which supports basic state schema evolution, including adding new fields, but the new added fields will
be default value when recovering from legacy state, i.e., always be -1 for checkpointId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an analysis again, if the job was recovered with some inflight instant events for the first time, these events would be recommitted into the table, the new job checkpoint id will start from -1.

@@ -512,12 +512,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("If enabled, writers get instant state from timeline server rather than requesting DFS directly");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INSTANT_STATE_TIMELINE_SERVER_BASED can be removed. Maybe changes introduced in this PR #9651 can all be cleaned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

public String requestInstantTime(long checkpointId) {
try {
InstantTimeResponse response = (InstantTimeResponse) this.gateway.sendRequestToCoordinator(this.operatorID,
new SerializedValue<>(InstantTimeRequest.getInstance(checkpointId))).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use get(long timeout, TimeUnit unit) here? and throw exception for fast failing, just in case unexpected stuck.

@@ -38,6 +38,7 @@ public class WriteMetadataEvent implements OperatorEvent {

private List<WriteStatus> writeStatuses;
private int taskID;
private long checkpointId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For WriteMetaDataEvent state, it uses PojoSerializer as state serializer which supports basic state schema evolution, including adding new fields, but the new added fields will
be default value when recovering from legacy state, i.e., always be -1 for checkpointId.

return dataStream.addSink(Pipelines.DummySink.INSTANCE)
.setParallelism(1)
// sets the same parallelism to upstream operators to enable partial failover.
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need to set the parallelism explicitly, the parallelism will be same as that of upstream write operator if not set, and will automatically chained together.

if (attemptId <= 0) {
HoodieTimeline pendingTimeline = this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
// if the task is initially started, resend the pending event.
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Write event from writeMetadataState should also be sent to coordinator for cases attemptId > 0, i.e., task failover?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this. It seems ok here as the write event for failover tasks should be kept on coordinator.

@github-project-automation github-project-automation bot moved this from 🆕 New to 🛬 Near landing in Hudi PR Support May 15, 2025
Copy link
Member

@voonhous voonhous left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to fix the PR template? Thanks

@@ -168,6 +148,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
snapshotState();
// Reload the snapshot state as the current state.
reloadWriteMetaState();
this.checkpointId = functionSnapshotContext.getCheckpointId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if user set tolerable checkpoint failure,

Step0: TaskA, TaskB and TaskC writing data using instant0

Step1: Checkpoint ID 1 trigger ==> taskA not finished(hang) , TaskB and TaskC finish snapthostState,and get next instant2 based on ckp id1.

Step2: checkpoint expiration due to a timeout.

Step3: Checkpoint ID 2 trigger ==> taskA still hang, TaskB and TaskC finish snapthostState, and get next instant3 based on ckp id2.

Step4: taskA finish snapshot and get next instant3 based on ckp id2.

So ckpid1 will never finish and instant2 will never commit which means related data are lost

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay,I see related logic in

  private boolean commitInstants(long checkpointId) {
    // use < instead of <= because the write metadata event sends the last known checkpoint id which is smaller than the current one.
    List<Boolean> result = this.eventBuffers.getEventBufferStream().filter(entry -> entry.getKey() < checkpointId)
        .map(entry -> commitInstant(entry.getKey(), entry.getValue().getLeft(), entry.getValue().getRight())).collect(Collectors.toList());
    return result.stream().anyMatch(i -> i);
  }

which means coordinator will commit all the instant together before checkpointId.

Maybe the situation just mentioned doesn't exist, but please double check here.

*/
public void cleanLegacyEvents(WriteMetadataEvent event) {
this.eventBuffers.entrySet().stream()
.filter(entry -> entry.getKey().compareTo(event.getCheckpointId()) >= 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanLegacyEvents operation may encounter performance issues in high-parallelism scenarios, such as with 6000 parallel tasks, due to repeated traversals of eventBuffers to identify the largest CheckpointID. To optimize this, could replacing the current data structure with an ordered one like TreeSet (which maintains sorted elements) be beneficial?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like

public void cleanLegacyEvents(WriteMetadataEvent event) {
    // tailMap O(log n + k)
    eventBuffers.tailMap(event.getCheckpointId(), true) 
                .values()
                .stream()
                .map(Pair::getRight)
                .forEach(eventBuffer -> resetBufferAt(eventBuffer, event.getTaskID()));
}

* The coordinator reuses the instant if there is no data for this round of checkpoint,
* sends the commit ack events to unblock the flushing.
*/
private void sendCommitAckEvents(long checkpointId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like CommitAckEvent related design are all deprecated
It is better to remove CommitAckEvent and public abstract void handleOperatorEvent(OperatorEvent event); related code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to keep it now in case it can be used in the future.

Copy link
Contributor

@cshuo cshuo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, left some minor comments.


/**
* Utilities for coordinator event buffer.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Align indentation of Line-33

@@ -485,9 +485,9 @@ public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object
return cleanCommitDataStream;
}

public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) {
public static DataStreamSink<Object> dummySink(Configuration conf, DataStream<Object> dataStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf is unnecessary

@@ -112,7 +112,7 @@ public static void main(String[] args) throws Exception {
// add clean function to rollback failed writes for lazy failed writes cleaning policy
Pipelines.clean(conf, pipeline);
} else {
Pipelines.dummySink(pipeline);
Pipelines.dummySink(conf, pipeline);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf is unnecessary

@@ -107,7 +107,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
// add clean function to rollback failed writes for lazy failed writes cleaning policy
return Pipelines.clean(conf, pipeline);
} else {
return Pipelines.dummySink(pipeline);
return Pipelines.dummySink(conf, pipeline);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf is unnecessary

Copy link
Contributor

@zhangyue19921010 zhangyue19921010 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. A great improvement that effectively addresses potential stream interruptions during checkpoint periods in traditional Flink write jobs.

@danny0405 danny0405 force-pushed the HUDI-9381 branch 2 times, most recently from db4ac2a to 850fe64 Compare May 16, 2025 08:14
The new instant time generation utilizes RPC request to coordinate creation of new instants.
Each write task will send an RPC request to the coordinator for the instant time, the coordinator
uses a global lock to guard the access from multiple tasks.

Now one checkpoint id corresponds to one instant.

Basic work flow:

* write task send request: current ckp id
* write task expected response: the instant time
* coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}}

Note that the ckp id used in the request is the last known id instead of the current checkpoint, for job restored
from a state, it is the state checkpoint id, otherwise -1 for fresh new job.
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor Author

@danny0405 danny0405 merged commit e34a7ab into apache:master May 16, 2025
56 of 58 checks passed
@github-project-automation github-project-automation bot moved this from 🛬 Near landing to ✅ Done in Hudi PR Support May 16, 2025
@vinothchandar vinothchandar self-assigned this May 28, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink size:XL PR with lines of changes > 1000 streaming transaction
Projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

6 participants