Skip to content

Commit 92f0d3f

Browse files
authored
Merge pull request #1148 from AndreKurait/MetadataTransforms
Add metadata transformations with metadata tuples
2 parents 1c23ab4 + 18e83bb commit 92f0d3f

26 files changed

+846
-148
lines changed

MetadataMigration/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dependencies {
1111
implementation project(":coreUtilities")
1212
implementation project(":RFS")
1313
implementation project(':transformation')
14+
implementation project(':transformation:transformationPlugins:jsonMessageTransformers:jsonMessageTransformerLoaders')
1415

1516
implementation group: 'org.jcommander', name: 'jcommander'
1617
implementation group: 'org.slf4j', name: 'slf4j-api'

MetadataMigration/src/main/java/org/opensearch/migrations/MigrateOrEvaluateArgs.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
66
import org.opensearch.migrations.bulkload.models.DataFilterArgs;
7+
import org.opensearch.migrations.transform.TransformerParams;
78

89
import com.beust.jcommander.Parameter;
910
import com.beust.jcommander.ParametersDelegate;
11+
import lombok.Getter;
1012

1113
public class MigrateOrEvaluateArgs {
1214
@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
@@ -52,4 +54,39 @@ public class MigrateOrEvaluateArgs {
5254

5355
@Parameter(names = {"--source-version" }, description = "Version of the source cluster, for example: Elasticsearch 7.10 or OS 1.3.", converter = VersionConverter.class)
5456
public Version sourceVersion = null;
57+
58+
@ParametersDelegate
59+
public TransformerParams metadataTransformationParams = new MetadataTransformerParams();
60+
61+
@Getter
62+
public static class MetadataTransformerParams implements TransformerParams {
63+
public String getTransformerConfigParameterArgPrefix() {
64+
return "";
65+
}
66+
@Parameter(
67+
required = false,
68+
names = "--transformer-config-base64",
69+
arity = 1,
70+
description = "Configuration of metadata transformers. The same contents as --transformer-config but " +
71+
"Base64 encoded so that the configuration is easier to pass as a command line parameter.")
72+
private String transformerConfigEncoded;
73+
74+
@Parameter(
75+
required = false,
76+
names = "--transformer-config",
77+
arity = 1,
78+
description = "Configuration of metadata transformers. Either as a string that identifies the "
79+
+ "transformer that should be run (with default settings) or as json to specify options "
80+
+ "as well as multiple transformers to run in sequence. "
81+
+ "For json, keys are the (simple) names of the loaded transformers and values are the "
82+
+ "configuration passed to each of the transformers.")
83+
private String transformerConfig;
84+
85+
@Parameter(
86+
required = false,
87+
names = "--transformer-config-file",
88+
arity = 1,
89+
description = "Path to the JSON configuration file of metadata transformers.")
90+
private String transformerConfigFile;
91+
}
5592
}

MetadataMigration/src/main/java/org/opensearch/migrations/commands/MigratorEvaluatorBase.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55

66
import org.opensearch.migrations.MigrateOrEvaluateArgs;
77
import org.opensearch.migrations.MigrationMode;
8+
import org.opensearch.migrations.bulkload.transformers.CompositeTransformer;
89
import org.opensearch.migrations.bulkload.transformers.TransformFunctions;
910
import org.opensearch.migrations.bulkload.transformers.Transformer;
11+
import org.opensearch.migrations.bulkload.transformers.TransformerToIJsonTransformerAdapter;
1012
import org.opensearch.migrations.bulkload.worker.IndexMetadataResults;
1113
import org.opensearch.migrations.bulkload.worker.IndexRunner;
1214
import org.opensearch.migrations.bulkload.worker.MetadataRunner;
@@ -17,12 +19,19 @@
1719
import org.opensearch.migrations.metadata.CreationResult;
1820
import org.opensearch.migrations.metadata.GlobalMetadataCreatorResults;
1921
import org.opensearch.migrations.metadata.tracing.RootMetadataMigrationContext;
22+
import org.opensearch.migrations.transform.TransformationLoader;
23+
import org.opensearch.migrations.transform.TransformerConfigUtils;
2024

2125
import lombok.extern.slf4j.Slf4j;
2226

2327
/** Shared functionality between migration and evaluation commands */
2428
@Slf4j
2529
public abstract class MigratorEvaluatorBase {
30+
public static final String NOOP_TRANSFORMATION_CONFIG = "[" +
31+
" {" +
32+
" \"NoopTransformerProvider\":\"\"" +
33+
" }" +
34+
"]";
2635

2736
static final int INVALID_PARAMETER_CODE = 999;
2837
static final int UNEXPECTED_FAILURE_CODE = 888;
@@ -45,14 +54,30 @@ protected Clusters createClusters() {
4554
return clusters.build();
4655
}
4756

57+
protected Transformer getCustomTransformer() {
58+
var transformerConfig = TransformerConfigUtils.getTransformerConfig(arguments.metadataTransformationParams);
59+
if (transformerConfig != null) {
60+
log.atInfo().setMessage("Metadata Transformations config string: {}")
61+
.addArgument(transformerConfig).log();
62+
} else {
63+
log.atInfo().setMessage("Using Noop custom transformation config: {}")
64+
.addArgument(NOOP_TRANSFORMATION_CONFIG).log();
65+
transformerConfig = NOOP_TRANSFORMATION_CONFIG;
66+
}
67+
var transformer = new TransformationLoader().getTransformerFactoryLoader(transformerConfig);
68+
return new TransformerToIJsonTransformerAdapter(transformer);
69+
}
70+
4871
protected Transformer selectTransformer(Clusters clusters) {
49-
var transformer = TransformFunctions.getTransformer(
72+
var versionTransformer = TransformFunctions.getTransformer(
5073
clusters.getSource().getVersion(),
5174
clusters.getTarget().getVersion(),
5275
arguments.minNumberOfReplicas
5376
);
54-
log.info("Selected transformer " + transformer.toString());
55-
return transformer;
77+
var customTransformer = getCustomTransformer();
78+
var compositeTransformer = new CompositeTransformer(customTransformer, versionTransformer);
79+
log.atInfo().setMessage("Selected transformer: {}").addArgument(compositeTransformer).log();
80+
return compositeTransformer;
5681
}
5782

5883
protected Items migrateAllItems(MigrationMode migrationMode, Clusters clusters, Transformer transformer, RootMetadataMigrationContext context) {

MetadataMigration/src/main/resources/log4j2.properties

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,60 @@ status = WARN
22

33
property.logsDir = ${env:SHARED_LOGS_DIR_PATH:-./logs}
44
property.failedLoggerFileNamePrefix = ${logsDir}/${hostName}/failedRequests/failedRequests
5+
property.metadataTuplesFileNamePrefix = ${logsDir}/${hostName}/metadataTuples/tuples
6+
property.runTime = ${date:yyyy-MM-dd_HH-mm-ss}
7+
property.metadataRunLoggerFileNamePrefix = ${logsDir}/${hostName}/metadata/metadata
8+
9+
appenders = console, MetadataTuples, FailedRequests, MetadataRun
10+
11+
appender.console.type = Console
12+
appender.console.name = Console
13+
appender.console.target = SYSTEM_OUT
14+
appender.console.layout.type = PatternLayout
15+
appender.console.layout.pattern = %m%n
16+
17+
rootLogger.level = ERROR
18+
rootLogger.appenderRef.console.ref = MetadataRun
519

6-
appenders = console, FailedRequests, MetadataRun
20+
# Metadata Migration
21+
logger.MetadataMigration.name = org.opensearch.migrations.MetadataMigration
22+
logger.MetadataMigration.level = info
23+
logger.MetadataMigration.additivity = false
24+
logger.MetadataMigration.appenderRef.stdout.ref = Console
25+
logger.MetadataMigration.appenderRef.MetadataRun.ref = MetadataRun
26+
27+
# Metadata Tuples
28+
appender.MetadataTuples.type = RollingRandomAccessFile
29+
appender.MetadataTuples.name = MetadataTuples
30+
appender.MetadataTuples.fileName = ${metadataTuplesFileNamePrefix}.log
31+
appender.MetadataTuples.filePattern = ${metadataTuplesFileNamePrefix}_${runTime}-%i.log
32+
appender.MetadataTuples.layout.type = PatternLayout
33+
appender.MetadataTuples.layout.pattern = %m%n
34+
appender.MetadataTuples.policies.type = Policies
35+
appender.MetadataTuples.policies.startup.type = OnStartupTriggeringPolicy
36+
appender.MetadataTuples.policies.startup.minSize = 0
37+
appender.MetadataTuples.strategy.type = DefaultRolloverStrategy
38+
appender.MetadataTuples.immediateFlush = false
39+
40+
logger.OutputTransformationJsonLogger.name = OutputTransformationJsonLogger
41+
logger.OutputTransformationJsonLogger.level = info
42+
logger.OutputTransformationJsonLogger.additivity = false
43+
logger.OutputTransformationJsonLogger.appenderRef.MetadataTuples.ref = MetadataTuples
44+
45+
# MetadataRun Logs
46+
appender.MetadataRun.type = File
47+
appender.MetadataRun.name = MetadataRun
48+
appender.MetadataRun.fileName = ${metadataRunLoggerFileNamePrefix}${runTime}-%i.log
49+
appender.MetadataRun.layout.type = PatternLayout
50+
appender.MetadataRun.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n
51+
appender.MetadataRun.immediateFlush = false
52+
53+
logger.MetadataLogger.name = MetadataLogger
54+
logger.MetadataLogger.level = debug
55+
logger.MetadataLogger.additivity = false
56+
logger.MetadataLogger.appenderRef.MetadataRun.ref = MetadataRun
757

58+
# Failed Requestss
859
appender.FailedRequests.type = RollingRandomAccessFile
960
appender.FailedRequests.name = FailedRequests
1061
appender.FailedRequests.fileName = ${failedLoggerFileNamePrefix}.log
@@ -21,33 +72,3 @@ logger.FailedRequestsLogger.name = FailedRequestsLogger
2172
logger.FailedRequestsLogger.level = info
2273
logger.FailedRequestsLogger.additivity = false
2374
logger.FailedRequestsLogger.appenderRef.FailedRequests.ref = FailedRequests
24-
25-
property.runTime = ${date:yyyy-MM-dd_HH-mm-ss}
26-
property.metadataRunLoggerFileNamePrefix = ${logsDir}/${hostName}/metadata/metadata_
27-
28-
appender.MetadataRun.type = File
29-
appender.MetadataRun.name = MetadataRun
30-
appender.MetadataRun.fileName = ${metadataRunLoggerFileNamePrefix}${runTime}.log
31-
appender.MetadataRun.layout.type = PatternLayout
32-
appender.MetadataRun.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n
33-
appender.MetadataRun.immediateFlush = true
34-
35-
logger.MetadataLogger.name = MetadataLogger
36-
logger.MetadataLogger.level = debug
37-
logger.MetadataLogger.additivity = false
38-
logger.MetadataLogger.appenderRef.MetadataRun.ref = MetadataRun
39-
40-
appender.console.type = Console
41-
appender.console.name = Console
42-
appender.console.target = SYSTEM_OUT
43-
appender.console.layout.type = PatternLayout
44-
appender.console.layout.pattern = %m%n
45-
46-
rootLogger.level = info
47-
rootLogger.appenderRef.console.ref = MetadataRun
48-
49-
logger.MetadataMigration.name = org.opensearch.migrations.MetadataMigration
50-
logger.MetadataMigration.level = info
51-
logger.MetadataMigration.additivity = false
52-
logger.MetadataMigration.appenderRef.stdout.ref = Console
53-
logger.MetadataMigration.appenderRef.MetadataRun.ref = MetadataRun

0 commit comments

Comments
 (0)