Skip to content

Commit 432ccd2

Browse files
committed
[HUDI-9381] Async instant time generation of Flink writers
The new instant time generation utilizes RPC request to coordinate creation of new instants. Each write task will send an RPC request to the coordinator for the instant time, the coordinator uses a global lock to guard the access from multiple tasks. Now one checkpoint id corresponds to one instant. Basic work flow: * write task send request: current ckp id * write task expected response: the instant time * coordinator mappings of checkpoint and instant: ckp-id → {instant → {write-task-id → write meta event}} Note that the ckp id used in the request is the last known id instead of the current checkpoint, for job restored from a state, it is the state checkpoint id, otherwise -1 for fresh new job.
1 parent 34ff039 commit 432ccd2

File tree

46 files changed

+1072
-1740
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1072
-1740
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,6 @@ private void startServer(TimelineServiceCreator timelineServiceCreator) throws I
170170
* writeConfig.getHoodieClientHeartbeatTolerableMisses());
171171
}
172172

173-
if (writeConfig.isTimelineServerBasedInstantStateEnabled()) {
174-
timelineServiceConfBuilder
175-
.instantStateForceRefreshRequestNumber(writeConfig.getTimelineServerBasedInstantStateForceRefreshRequestNumber())
176-
.enableInstantStateRequests(true);
177-
}
178-
179173
this.serviceConfig = timelineServiceConfBuilder.build();
180174

181175
server = timelineServiceCreator.create(context, storageConf.newInstance(), serviceConfig, viewManager);
@@ -191,10 +185,10 @@ TimelineService create(HoodieEngineContext context, StorageConfiguration<?> stor
191185

192186
private void setHostAddr(String embeddedTimelineServiceHostAddr) {
193187
if (embeddedTimelineServiceHostAddr != null) {
194-
LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
188+
LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in write conf. It was " + this.hostAddr);
195189
this.hostAddr = embeddedTimelineServiceHostAddr;
196190
} else {
197-
LOG.warn("Unable to find driver bind address from spark config");
191+
LOG.warn("Unable to find driver bind address from write config, use current host name");
198192
this.hostAddr = NetworkUtils.getHostname();
199193
}
200194
}
@@ -258,23 +252,23 @@ public void stopForBasePath(String basePath) {
258252

259253
private static TimelineServiceIdentifier getTimelineServiceIdentifier(String hostAddr, HoodieWriteConfig writeConfig) {
260254
return new TimelineServiceIdentifier(hostAddr, writeConfig.getMarkersType(), writeConfig.isMetadataTableEnabled(),
261-
writeConfig.isEarlyConflictDetectionEnable(), writeConfig.isTimelineServerBasedInstantStateEnabled());
255+
writeConfig.isEarlyConflictDetectionEnable());
262256
}
263257

264258
static class TimelineServiceIdentifier {
265259
private final String hostAddr;
266260
private final MarkerType markerType;
267261
private final boolean isMetadataEnabled;
268262
private final boolean isEarlyConflictDetectionEnable;
269-
private final boolean isTimelineServerBasedInstantStateEnabled;
270263

271-
public TimelineServiceIdentifier(String hostAddr, MarkerType markerType, boolean isMetadataEnabled, boolean isEarlyConflictDetectionEnable,
272-
boolean isTimelineServerBasedInstantStateEnabled) {
264+
public TimelineServiceIdentifier(String hostAddr,
265+
MarkerType markerType,
266+
boolean isMetadataEnabled,
267+
boolean isEarlyConflictDetectionEnable) {
273268
this.hostAddr = hostAddr;
274269
this.markerType = markerType;
275270
this.isMetadataEnabled = isMetadataEnabled;
276271
this.isEarlyConflictDetectionEnable = isEarlyConflictDetectionEnable;
277-
this.isTimelineServerBasedInstantStateEnabled = isTimelineServerBasedInstantStateEnabled;
278272
}
279273

280274
@Override
@@ -288,15 +282,15 @@ public boolean equals(Object o) {
288282
TimelineServiceIdentifier that = (TimelineServiceIdentifier) o;
289283
if (this.hostAddr != null && that.hostAddr != null) {
290284
return isMetadataEnabled == that.isMetadataEnabled && isEarlyConflictDetectionEnable == that.isEarlyConflictDetectionEnable
291-
&& isTimelineServerBasedInstantStateEnabled == that.isTimelineServerBasedInstantStateEnabled && hostAddr.equals(that.hostAddr) && markerType == that.markerType;
285+
&& hostAddr.equals(that.hostAddr) && markerType == that.markerType;
292286
} else {
293287
return (hostAddr == null && that.hostAddr == null);
294288
}
295289
}
296290

297291
@Override
298292
public int hashCode() {
299-
return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable, isTimelineServerBasedInstantStateEnabled);
293+
return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable);
300294
}
301295
}
302296
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY;
5454
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
55+
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
5556
import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME;
5657

5758
/**
@@ -208,6 +209,7 @@ public static TypedProperties getLockConfig(String tablePath) {
208209
TypedProperties props = new TypedProperties();
209210
props.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), FileSystemBasedLockProvider.class.getName());
210211
props.put(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(), "2000");
212+
props.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200");
211213
props.put(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key(), "1");
212214
props.put(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(), "30");
213215
props.put(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key(), defaultLockPath(tablePath));

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -506,18 +506,6 @@ public class HoodieWriteConfig extends HoodieConfig {
506506
.sinceVersion("0.9.0")
507507
.withDocumentation(MarkerType.class);
508508

509-
public static final ConfigProperty<Boolean> INSTANT_STATE_TIMELINE_SERVER_BASED = ConfigProperty
510-
.key("hoodie.instant_state.timeline_server_based.enabled")
511-
.defaultValue(false)
512-
.sinceVersion("1.0.0")
513-
.withDocumentation("If enabled, writers get instant state from timeline server rather than requesting DFS directly");
514-
515-
public static final ConfigProperty<Integer> INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER = ConfigProperty
516-
.key("hoodie.instant_state.timeline_server_based.force_refresh.request.number")
517-
.defaultValue(100)
518-
.sinceVersion("1.0.0")
519-
.withDocumentation("Number of requests to trigger instant state cache refreshing");
520-
521509
public static final ConfigProperty<Integer> MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty
522510
.key("hoodie.markers.timeline_server_based.batch.num_threads")
523511
.defaultValue(20)
@@ -1526,14 +1514,6 @@ public int getFinalizeWriteParallelism() {
15261514
return getInt(FINALIZE_WRITE_PARALLELISM_VALUE);
15271515
}
15281516

1529-
public boolean isTimelineServerBasedInstantStateEnabled() {
1530-
return getBoolean(INSTANT_STATE_TIMELINE_SERVER_BASED);
1531-
}
1532-
1533-
public int getTimelineServerBasedInstantStateForceRefreshRequestNumber() {
1534-
return getInt(INSTANT_STATE_TIMELINE_SERVER_BASED_FORCE_REFRESH_REQUEST_NUMBER);
1535-
}
1536-
15371517
public MarkerType getMarkersType() {
15381518
String markerType = getString(MARKERS_TYPE);
15391519
return MarkerType.valueOf(markerType.toUpperCase());
@@ -3366,11 +3346,6 @@ public Builder withEmbeddedTimelineServerPort(int port) {
33663346
return this;
33673347
}
33683348

3369-
public Builder withTimelineServerBasedInstantStateEnable(boolean enable) {
3370-
writeConfig.setValue(INSTANT_STATE_TIMELINE_SERVER_BASED, String.valueOf(enable));
3371-
return this;
3372-
}
3373-
33743349
public Builder withBulkInsertSortMode(String mode) {
33753350
writeConfig.setValue(BULK_INSERT_SORT_MODE, mode);
33763351
return this;

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimeGeneratorBase.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.Arrays;
3434
import java.util.concurrent.TimeUnit;
3535

36-
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
3736
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
3837
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS;
3938
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
@@ -76,10 +75,9 @@ public TimeGeneratorBase(HoodieTimeGeneratorConfig config, StorageConfiguration<
7675
Integer.parseInt(DEFAULT_LOCK_ACQUIRE_NUM_RETRIES));
7776
lockAcquireWaitTimeInMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,
7877
DEFAULT_LOCK_ACQUIRE_WAIT_TIMEOUT_MS);
79-
// The maximum time to wait for each time generation to resolve the clock skew issue on distributed hosts.
80-
long maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
81-
Long.parseLong(DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
82-
lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries, maxWaitTimeInMs,
78+
// The initial time to wait for each time generation to resolve the clock skew issue on distributed hosts.
79+
long initialRetryInternal = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 200);
80+
lockRetryHelper = new RetryHelper<>(initialRetryInternal, maxRetries, initialRetryInternal,
8381
Arrays.asList(HoodieLockException.class, InterruptedException.class), "acquire timeGenerator lock");
8482
}
8583

hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/InstantStateDTO.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ private boolean flushBucket(RowDataBucket bucket) {
350350
final List<WriteStatus> writeStatus = writeRecords(instant, bucket);
351351
final WriteMetadataEvent event = WriteMetadataEvent.builder()
352352
.taskID(taskID)
353+
.checkpointId(this.checkpointId)
353354
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
354355
.writeStatus(writeStatus)
355356
.lastBatch(false)
@@ -386,6 +387,7 @@ private void flushRemaining(boolean endInput) {
386387
}
387388
final WriteMetadataEvent event = WriteMetadataEvent.builder()
388389
.taskID(taskID)
390+
.checkpointId(checkpointId)
389391
.instantTime(currentInstant)
390392
.writeStatus(writeStatus)
391393
.lastBatch(true)
@@ -397,8 +399,6 @@ private void flushRemaining(boolean endInput) {
397399
this.tracer.reset();
398400
this.writeClient.cleanHandles();
399401
this.writeStatuses.addAll(writeStatus);
400-
// blocks flushing until the coordinator starts a new instant
401-
this.confirming = true;
402402

403403
writeMetrics.endDataFlush();
404404
writeMetrics.resetAfterCommit();

0 commit comments

Comments
 (0)