Skip to content

Add metadata transformations with metadata tuples #1148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MetadataMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation project(":coreUtilities")
implementation project(":RFS")
implementation project(':transformation')
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'org.slf4j', name: 'slf4j-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
import org.opensearch.migrations.transform.TransformerParams;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;

public class MigrateOrEvaluateArgs {
@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
Expand Down Expand Up @@ -52,4 +54,39 @@ public class MigrateOrEvaluateArgs {

@Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3.", converter = VersionConverter.class)
public Version sourceVersion = null;

@ParametersDelegate
public TransformerParams metadataTransformationParams = new MetadataTransformerParams();

@Getter
public static class MetadataTransformerParams implements TransformerParams {
public String getTransformerConfigParameterArgPrefix() {
return "";
}
@Parameter(
required = false,
names = "--transformer-config-base64",
arity = 1,
description = "Configuration of metadata transformers. The same contents as --transformer-config but " +
"Base64 encoded so that the configuration is easier to pass as a command line parameter.")
private String transformerConfigEncoded;

@Parameter(
required = false,
names = "--transformer-config",
arity = 1,
description = "Configuration of metadata transformers. Either as a string that identifies the "
+ "transformer that should be run (with default settings) or as json to specify options "
+ "as well as multiple transformers to run in sequence. "
+ "For json, keys are the (simple) names of the loaded transformers and values are the "
+ "configuration passed to each of the transformers.")
private String transformerConfig;

@Parameter(
required = false,
names = "--transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file of metadata transformers.")
private String transformerConfigFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

import org.opensearch.migrations.MigrateOrEvaluateArgs;
import org.opensearch.migrations.MigrationMode;
import org.opensearch.migrations.bulkload.transformers.CompositeTransformer;
import org.opensearch.migrations.bulkload.transformers.TransformFunctions;
import org.opensearch.migrations.bulkload.transformers.Transformer;
import org.opensearch.migrations.bulkload.transformers.TransformerToIJsonTransformerAdapter;
import org.opensearch.migrations.bulkload.worker.IndexMetadataResults;
import org.opensearch.migrations.bulkload.worker.IndexRunner;
import org.opensearch.migrations.bulkload.worker.MetadataRunner;
Expand All @@ -17,12 +19,19 @@
import org.opensearch.migrations.metadata.CreationResult;
import org.opensearch.migrations.metadata.GlobalMetadataCreatorResults;
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;
import org.opensearch.migrations.transform.TransformationLoader;
import org.opensearch.migrations.transform.TransformerConfigUtils;

import lombok.extern.slf4j.Slf4j;

/** Shared functionality between migration and evaluation commands */
@Slf4j
public abstract class MigratorEvaluatorBase {
public static final String NOOP_TRANSFORMATION_CONFIG = "[" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets remove this, if there is no transformation config why create and use the factory?

" {" +
" \"NoopTransformerProvider\":\"\"" +
" }" +
"]";

static final int INVALID_PARAMETER_CODE = 999;
static final int UNEXPECTED_FAILURE_CODE = 888;
Expand All @@ -45,14 +54,30 @@ protected Clusters createClusters() {
return clusters.build();
}

protected Transformer getCustomTransformer() {
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataTransformationParams);
if (transformerConfig != null) {
log.atInfo().setMessage("Metadata Transformations config string: {}")
.addArgument(transformerConfig).log();
} else {
log.atInfo().setMessage("Using Noop custom transformation config: {}")
.addArgument(NOOP_TRANSFORMATION_CONFIG).log();
transformerConfig = NOOP_TRANSFORMATION_CONFIG;
}
var transformer = new TransformationLoader().getTransformerFactoryLoader(transformerConfig);
return new TransformerToIJsonTransformerAdapter(transformer);
}

protected Transformer selectTransformer(Clusters clusters) {
var transformer = TransformFunctions.getTransformer(
var versionTransformer = TransformFunctions.getTransformer(
clusters.getSource().getVersion(),
clusters.getTarget().getVersion(),
arguments.minNumberOfReplicas
);
log.info("Selected transformer " + transformer.toString());
return transformer;
var customTransformer = getCustomTransformer();
var compositeTransformer = new CompositeTransformer(customTransformer, versionTransformer);
log.atInfo().setMessage("Selected transformer: {}").addArgument(compositeTransformer).log();
return compositeTransformer;
}

protected Items migrateAllItems(MigrationMode migrationMode, Clusters clusters, Transformer transformer, RootMetadataMigrationContext context) {
Expand Down
83 changes: 52 additions & 31 deletions MetadataMigration/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,60 @@ status = WARN

property.logsDir = ${env:SHARED_LOGS_DIR_PATH:-./logs}
property.failedLoggerFileNamePrefix = ${logsDir}/${hostName}/failedRequests/failedRequests
property.metadataTuplesFileNamePrefix = ${logsDir}/${hostName}/metadataTuples/tuples
property.runTime = ${date:yyyy-MM-dd_HH-mm-ss}
property.metadataRunLoggerFileNamePrefix = ${logsDir}/${hostName}/metadata/metadata

appenders = console, MetadataTuples, FailedRequests, MetadataRun

appender.console.type = Console
appender.console.name = Console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %m%n

rootLogger.level = ERROR
rootLogger.appenderRef.console.ref = MetadataRun

appenders = console, FailedRequests, MetadataRun
# Metadata Migration
logger.MetadataMigration.name = org.opensearch.migrations.MetadataMigration
logger.MetadataMigration.level = info
logger.MetadataMigration.additivity = false
logger.MetadataMigration.appenderRef.stdout.ref = Console
logger.MetadataMigration.appenderRef.MetadataRun.ref = MetadataRun

# Metadata Tuples
appender.MetadataTuples.type = RollingRandomAccessFile
appender.MetadataTuples.name = MetadataTuples
appender.MetadataTuples.fileName = ${metadataTuplesFileNamePrefix}.log
appender.MetadataTuples.filePattern = ${metadataTuplesFileNamePrefix}_${runTime}-%i.log
appender.MetadataTuples.layout.type = PatternLayout
appender.MetadataTuples.layout.pattern = %m%n
appender.MetadataTuples.policies.type = Policies
appender.MetadataTuples.policies.startup.type = OnStartupTriggeringPolicy
appender.MetadataTuples.policies.startup.minSize = 0
appender.MetadataTuples.strategy.type = DefaultRolloverStrategy
appender.MetadataTuples.immediateFlush = false

logger.OutputTransformationJsonLogger.name = OutputTransformationJsonLogger
logger.OutputTransformationJsonLogger.level = info
logger.OutputTransformationJsonLogger.additivity = false
logger.OutputTransformationJsonLogger.appenderRef.MetadataTuples.ref = MetadataTuples

# MetadataRun Logs
appender.MetadataRun.type = File
appender.MetadataRun.name = MetadataRun
appender.MetadataRun.fileName = ${metadataRunLoggerFileNamePrefix}${runTime}-%i.log
appender.MetadataRun.layout.type = PatternLayout
appender.MetadataRun.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n
appender.MetadataRun.immediateFlush = false

logger.MetadataLogger.name = MetadataLogger
logger.MetadataLogger.level = debug
logger.MetadataLogger.additivity = false
logger.MetadataLogger.appenderRef.MetadataRun.ref = MetadataRun

# Failed Requestss
appender.FailedRequests.type = RollingRandomAccessFile
appender.FailedRequests.name = FailedRequests
appender.FailedRequests.fileName = ${failedLoggerFileNamePrefix}.log
Expand All @@ -21,33 +72,3 @@ logger.FailedRequestsLogger.name = FailedRequestsLogger
logger.FailedRequestsLogger.level = info
logger.FailedRequestsLogger.additivity = false
logger.FailedRequestsLogger.appenderRef.FailedRequests.ref = FailedRequests

property.runTime = ${date:yyyy-MM-dd_HH-mm-ss}
property.metadataRunLoggerFileNamePrefix = ${logsDir}/${hostName}/metadata/metadata_

appender.MetadataRun.type = File
appender.MetadataRun.name = MetadataRun
appender.MetadataRun.fileName = ${metadataRunLoggerFileNamePrefix}${runTime}.log
appender.MetadataRun.layout.type = PatternLayout
appender.MetadataRun.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n
appender.MetadataRun.immediateFlush = true

logger.MetadataLogger.name = MetadataLogger
logger.MetadataLogger.level = debug
logger.MetadataLogger.additivity = false
logger.MetadataLogger.appenderRef.MetadataRun.ref = MetadataRun

appender.console.type = Console
appender.console.name = Console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %m%n

rootLogger.level = info
rootLogger.appenderRef.console.ref = MetadataRun

logger.MetadataMigration.name = org.opensearch.migrations.MetadataMigration
logger.MetadataMigration.level = info
logger.MetadataMigration.additivity = false
logger.MetadataMigration.appenderRef.stdout.ref = Console
logger.MetadataMigration.appenderRef.MetadataRun.ref = MetadataRun
Loading
Loading