Skip to content

Make restore support multi-project #131661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -1394,7 +1395,7 @@ public void testWarningHeaderOnRestoreWithoutTemplates() throws Exception {
.get();

RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
SnapshotId snapshotId = createSnapshotResponse.getSnapshotInfo().snapshotId();
Snapshot snapshot = createSnapshotResponse.getSnapshotInfo().snapshot();
assertEquals(RestStatus.OK, status);

assertEquals(Collections.singletonList(dsBackingIndexName), getSnapshot(REPO, SNAPSHOT).indices());
Expand Down Expand Up @@ -1423,7 +1424,7 @@ public void testWarningHeaderOnRestoreWithoutTemplates() throws Exception {
client,
request,
"Snapshot ["
+ snapshotId
+ snapshot
+ "] contains data stream ["
+ datastreamName
+ "] but custer does not have a matching index "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.snapshots.RestoreService;
Expand All @@ -29,14 +30,16 @@
public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<RestoreSnapshotRequest, RestoreSnapshotResponse> {
public static final ActionType<RestoreSnapshotResponse> TYPE = new ActionType<>("cluster:admin/snapshot/restore");
private final RestoreService restoreService;
private final ProjectResolver projectResolver;

@Inject
public TransportRestoreSnapshotAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RestoreService restoreService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
Expand All @@ -49,13 +52,15 @@ public TransportRestoreSnapshotAction(
threadPool.executor(ThreadPool.Names.SNAPSHOT_META)
);
this.restoreService = restoreService;
this.projectResolver = projectResolver;
}

@Override
protected ClusterBlockException checkBlock(RestoreSnapshotRequest request, ClusterState state) {
// Restoring a snapshot might change the global state and create/change an index,
// so we need to check for METADATA_WRITE and WRITE blocks
ClusterBlockException blockException = state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
ClusterBlockException blockException = state.blocks()
.globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE);
if (blockException != null) {
return blockException;
}
Expand All @@ -70,17 +75,21 @@ protected void masterOperation(
final ClusterState state,
final ActionListener<RestoreSnapshotResponse> listener
) {
restoreService.restoreSnapshot(request, listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.restoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
threadPool.getThreadContext()
);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.restoreInfo()));
}
}));
restoreService.restoreSnapshot(
projectResolver.getProjectId(),
request,
listener.delegateFailure((delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.restoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(
clusterService,
restoreCompletionResponse,
delegatedListener,
threadPool.getThreadContext()
);
} else {
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.restoreInfo()));
}
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -43,14 +44,16 @@
public class TransportRecoveryAction extends TransportBroadcastByNodeAction<RecoveryRequest, RecoveryResponse, RecoveryState> {

private final IndicesService indicesService;
private final ProjectResolver projectResolver;

@Inject
public TransportRecoveryAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
ProjectResolver projectResolver
) {
super(
RecoveryAction.NAME,
Expand All @@ -62,6 +65,7 @@ public TransportRecoveryAction(
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
);
this.indicesService = indicesService;
this.projectResolver = projectResolver;
}

@Override
Expand Down Expand Up @@ -110,16 +114,16 @@ protected void shardOperation(RecoveryRequest request, ShardRouting shardRouting

@Override
protected ShardsIterator shards(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.routingTable().allShardsIncludingRelocationTargets(concreteIndices);
return state.routingTable(projectResolver.getProjectId()).allShardsIncludingRelocationTargets(concreteIndices);
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, RecoveryRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, RecoveryRequest request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
return state.blocks().indicesBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ, concreteIndices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,8 @@ public Map<String, String> queryFields() {
systemIndices,
indicesService,
fileSettingsService,
threadPool
threadPool,
projectResolver.supportsMultipleProjects()
);

DiscoveryModule discoveryModule = createDiscoveryModule(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
Expand Down Expand Up @@ -377,7 +378,10 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
METADATA_NAME_FORMAT,
(repoName, parser) -> ProjectMetadata.Builder.fromXContent(parser),
projectMetadata -> ChunkedToXContent.wrapAsToXContent(
params -> Iterators.concat(Iterators.single((builder, ignored) -> builder.field("id", projectMetadata.id())))
params -> Iterators.concat(
Iterators.single((ToXContent) (builder, p) -> builder.field("id", projectMetadata.id())),
projectMetadata.toXContentChunked(params)
)
Comment on lines -380 to +384
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing a silly bug from #130000 ...

)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -126,8 +127,9 @@ public Path watchedFile() {
* file based settings from the cluster state.
* @param clusterState the cluster state before snapshot restore
* @param mdBuilder the current metadata builder for the new cluster state
* @param projectId the project associated with the restore
*/
public void handleSnapshotRestore(ClusterState clusterState, Metadata.Builder mdBuilder) {
public void handleSnapshotRestore(ClusterState clusterState, Metadata.Builder mdBuilder, ProjectId projectId) {
assert clusterState.nodes().isLocalNodeElectedMaster();

ReservedStateMetadata fileSettingsMetadata = clusterState.metadata().reservedStateMetadata().get(NAMESPACE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,10 @@ private class FetchingSnapshotShardSizeRunnable extends AbstractRunnable {

@Override
protected void doRun() throws Exception {
final Repository repository = repositoriesService.repository(snapshotShard.snapshot.getRepository());
final Repository repository = repositoriesService.repository(
snapshotShard.snapshot().getProjectId(),
snapshotShard.snapshot.getRepository()
);

logger.debug("fetching snapshot shard size for {}", snapshotShard);
final long snapshotShardSize = repository.getShardSnapshotStatus(
Expand Down
Loading