Skip to content

Commit f7ae41e

Browse files
committed
[HUDI-9381] Async instant time generation of Flink writers
The new instant time generation utilizes RPC request to coordinate creation of new instants. Each write task will send a RPC request to the coordinator for the instant time, the coordinator uses a global lock to guard the access from multiple tasks. Now one checkpoint id corresponds to one instant. Basic work flow: * write task send request: current ckp id * write task expected response: the instant time * coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}} Note that the ckp id used in the request is the last known id instead of the current checkpoint, for job restored from a state, it is the state checkpoint id, otherwise -1 for fresh new job.
1 parent 34ff039 commit f7ae41e

37 files changed

+581
-1556
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,6 @@ private void startServer(TimelineServiceCreator timelineServiceCreator) throws I
170170
* writeConfig.getHoodieClientHeartbeatTolerableMisses());
171171
}
172172

173-
if (writeConfig.isTimelineServerBasedInstantStateEnabled()) {
174-
timelineServiceConfBuilder
175-
.instantStateForceRefreshRequestNumber(writeConfig.getTimelineServerBasedInstantStateForceRefreshRequestNumber())
176-
.enableInstantStateRequests(true);
177-
}
178-
179173
this.serviceConfig = timelineServiceConfBuilder.build();
180174

181175
server = timelineServiceCreator.create(context, storageConf.newInstance(), serviceConfig, viewManager);
@@ -191,10 +185,10 @@ TimelineService create(HoodieEngineContext context, StorageConfiguration<?> stor
191185

192186
private void setHostAddr(String embeddedTimelineServiceHostAddr) {
193187
if (embeddedTimelineServiceHostAddr != null) {
194-
LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
188+
LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in write conf. It was " + this.hostAddr);
195189
this.hostAddr = embeddedTimelineServiceHostAddr;
196190
} else {
197-
LOG.warn("Unable to find driver bind address from spark config");
191+
LOG.warn("Unable to find driver bind address from write config, use current host name");
198192
this.hostAddr = NetworkUtils.getHostname();
199193
}
200194
}
@@ -258,23 +252,23 @@ public void stopForBasePath(String basePath) {
258252

259253
private static TimelineServiceIdentifier getTimelineServiceIdentifier(String hostAddr, HoodieWriteConfig writeConfig) {
260254
return new TimelineServiceIdentifier(hostAddr, writeConfig.getMarkersType(), writeConfig.isMetadataTableEnabled(),
261-
writeConfig.isEarlyConflictDetectionEnable(), writeConfig.isTimelineServerBasedInstantStateEnabled());
255+
writeConfig.isEarlyConflictDetectionEnable());
262256
}
263257

264258
static class TimelineServiceIdentifier {
265259
private final String hostAddr;
266260
private final MarkerType markerType;
267261
private final boolean isMetadataEnabled;
268262
private final boolean isEarlyConflictDetectionEnable;
269-
private final boolean isTimelineServerBasedInstantStateEnabled;
270263

271-
public TimelineServiceIdentifier(String hostAddr, MarkerType markerType, boolean isMetadataEnabled, boolean isEarlyConflictDetectionEnable,
272-
boolean isTimelineServerBasedInstantStateEnabled) {
264+
public TimelineServiceIdentifier(String hostAddr,
265+
MarkerType markerType,
266+
boolean isMetadataEnabled,
267+
boolean isEarlyConflictDetectionEnable) {
273268
this.hostAddr = hostAddr;
274269
this.markerType = markerType;
275270
this.isMetadataEnabled = isMetadataEnabled;
276271
this.isEarlyConflictDetectionEnable = isEarlyConflictDetectionEnable;
277-
this.isTimelineServerBasedInstantStateEnabled = isTimelineServerBasedInstantStateEnabled;
278272
}
279273

280274
@Override
@@ -288,15 +282,15 @@ public boolean equals(Object o) {
288282
TimelineServiceIdentifier that = (TimelineServiceIdentifier) o;
289283
if (this.hostAddr != null && that.hostAddr != null) {
290284
return isMetadataEnabled == that.isMetadataEnabled && isEarlyConflictDetectionEnable == that.isEarlyConflictDetectionEnable
291-
&& isTimelineServerBasedInstantStateEnabled == that.isTimelineServerBasedInstantStateEnabled && hostAddr.equals(that.hostAddr) && markerType == that.markerType;
285+
&& hostAddr.equals(that.hostAddr) && markerType == that.markerType;
292286
} else {
293287
return (hostAddr == null && that.hostAddr == null);
294288
}
295289
}
296290

297291
@Override
298292
public int hashCode() {
299-
return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable, isTimelineServerBasedInstantStateEnabled);
293+
return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable);
300294
}
301295
}
302296
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
5454
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
55+
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
5556
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
5657

5758
/**
@@ -208,6 +209,7 @@ public static TypedProperties getLockConfig(String tablePath) {
208209
TypedProperties props = new TypedProperties();
209210
props.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), FileSystemBasedLockProvider.class.getName());
210211
props.put(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(), "2000");
212+
props.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200");
211213
props.put(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key(), "1");
212214
props.put(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(), "30");
213215
props.put(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key(), defaultLockPath(tablePath));

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -512,12 +512,6 @@ public class HoodieWriteConfig extends HoodieConfig {
512512
.sinceVersion("1.0.0")
513513
.withDocumentation("If enabled, writers get instant state from timeline server rather than requesting DFS directly");
514514

515-
public static final ConfigProperty<Integer> INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER = ConfigProperty
516-
.key("hoodie.instant_state.timeline_server_based.force_refresh.request.number")
517-
.defaultValue(100)
518-
.sinceVersion("1.0.0")
519-
.withDocumentation("Number of requests to trigger instant state cache refreshing");
520-
521515
public static final ConfigProperty<Integer> MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty
522516
.key("hoodie.markers.timeline_server_based.batch.num_threads")
523517
.defaultValue(20)
@@ -1526,14 +1520,6 @@ public int getFinalizeWriteParallelism() {
15261520
return getInt(FINALIZE_WRITE_PARALLELISM_VALUE);
15271521
}
15281522

1529-
public boolean isTimelineServerBasedInstantStateEnabled() {
1530-
return getBoolean(INSTANT_STATE_TIMELINE_SERVER_BASED);
1531-
}
1532-
1533-
public int getTimelineServerBasedInstantStateForceRefreshRequestNumber() {
1534-
return getInt(INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER);
1535-
}
1536-
15371523
public MarkerType getMarkersType() {
15381524
String markerType = getString(MARKERS_TYPE);
15391525
return MarkerType.valueOf(markerType.toUpperCase());
@@ -3366,11 +3352,6 @@ public Builder withEmbeddedTimelineServerPort(int port) {
33663352
return this;
33673353
}
33683354

3369-
public Builder withTimelineServerBasedInstantStateEnable(boolean enable) {
3370-
writeConfig.setValue(INSTANT_STATE_TIMELINE_SERVER_BASED, String.valueOf(enable));
3371-
return this;
3372-
}
3373-
33743355
public Builder withBulkInsertSortMode(String mode) {
33753356
writeConfig.setValue(BULK_INSERT_SORT_MODE, mode);
33763357
return this;

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Arrays;
3434
import java.util.concurrent.TimeUnit;
3535

36-
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
3736
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
3837
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS;
3938
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
@@ -76,10 +75,9 @@ public TimeGeneratorBase(HoodieTimeGeneratorConfig config, StorageConfiguration<
7675
Integer.parseInt(DEFAULT_LOCK_ACQUIRE_NUM_RETRIES));
7776
lockAcquireWaitTimeInMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
7877
DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS);
79-
// The maximum time to wait for each time generation to resolve the clock skew issue on distributed hosts.
80-
long maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
81-
Long.parseLong(DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
82-
lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries, maxWaitTimeInMs,
78+
// The initial time to wait for each time generation to resolve the clock skew issue on distributed hosts.
79+
long initialRetryInternal = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 200);
80+
lockRetryHelper = new RetryHelper<>(initialRetryInternal, maxRetries, initialRetryInternal,
8381
Arrays.asList(HoodieLockException.class, InterruptedException.class), "acquire timeGenerator lock");
8482
}
8583

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ private boolean flushBucket(RowDataBucket bucket) {
350350
final List<WriteStatus> writeStatus = writeRecords(instant, bucket);
351351
final WriteMetadataEvent event = WriteMetadataEvent.builder()
352352
.taskID(taskID)
353+
.checkpointId(this.checkpointId)
353354
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
354355
.writeStatus(writeStatus)
355356
.lastBatch(false)
@@ -386,6 +387,7 @@ private void flushRemaining(boolean endInput) {
386387
}
387388
final WriteMetadataEvent event = WriteMetadataEvent.builder()
388389
.taskID(taskID)
390+
.checkpointId(checkpointId)
389391
.instantTime(currentInstant)
390392
.writeStatus(writeStatus)
391393
.lastBatch(true)
@@ -397,8 +399,6 @@ private void flushRemaining(boolean endInput) {
397399
this.tracer.reset();
398400
this.writeClient.cleanHandles();
399401
this.writeStatuses.addAll(writeStatus);
400-
// blocks flushing until the coordinator starts a new instant
401-
this.confirming = true;
402402

403403
writeMetrics.endDataFlush();
404404
writeMetrics.resetAfterCommit();

0 commit comments

Comments
 (0)