From 92cce0cfcd5e77622d7e7f8062761aec24a34bcc Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 14 Jul 2025 14:15:04 -0500 Subject: [PATCH 1/9] Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline --- .../geoip/GeoIpDownloaderTaskExecutor.java | 70 ++++++-- .../GeoIpDownloaderTaskExecutorTests.java | 155 ++++++++++++++++++ 2 files changed, 215 insertions(+), 10 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 087a5b4e6296c..463d568f39bd2 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; @@ -297,11 +298,13 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { @SuppressWarnings("unchecked") private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) { List configurations = IngestService.getPipelines(projectMetadata); + Map pipelineConfigurations = configurations.stream() + .collect(Collectors.toMap(PipelineConfiguration::getId, pipelineConfiguration -> pipelineConfiguration)); Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); - if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) { + if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation, pipelineConfigurations)) { ids.add(configuration.getId()); } } @@ -312,13 +315,18 @@ private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMe * Check if a list of processor contains at least a geoip processor. * @param processors List of processors. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ - private static boolean hasAtLeastOneGeoipProcessor(List> processors, boolean downloadDatabaseOnPipelineCreation) { + private static boolean hasAtLeastOneGeoipProcessor( + List> processors, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigurations + ) { if (processors != null) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Map processor : processors) { - if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) { + if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations)) { return true; } } @@ -330,10 +338,15 @@ private static boolean hasAtLeastOneGeoipProcessor(List> pro * Check if a processor config is a geoip processor or contains at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") - private static boolean hasAtLeastOneGeoipProcessor(Map processor, boolean downloadDatabaseOnPipelineCreation) { + private static boolean hasAtLeastOneGeoipProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigurations + ) { if (processor == null) { return false; } @@ -352,27 +365,31 @@ private static boolean hasAtLeastOneGeoipProcessor(Map processor } } - return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation) - || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation); + return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations) + || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations) + || isPipelineProcessorWithGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations); } /** * Check if a processor config has an on_failure clause containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean isProcessorWithOnFailureGeoIpProcessor( Map processor, - boolean downloadDatabaseOnPipelineCreation + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigurations ) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Object value : processor.values()) { if (value instanceof Map && hasAtLeastOneGeoipProcessor( ((Map>>) value).get("on_failure"), - downloadDatabaseOnPipelineCreation + downloadDatabaseOnPipelineCreation, + pipelineConfigurations )) { return true; } @@ -384,13 +401,46 @@ && hasAtLeastOneGeoipProcessor( * Check if a processor is a foreach processor containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") - private static boolean isForeachProcessorWithGeoipProcessor(Map processor, boolean downloadDatabaseOnPipelineCreation) { + private static boolean isForeachProcessorWithGeoipProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigurations + ) { final Map processorConfig = (Map) processor.get("foreach"); return processorConfig != null - && hasAtLeastOneGeoipProcessor((Map) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation); + && hasAtLeastOneGeoipProcessor( + (Map) processorConfig.get("processor"), + downloadDatabaseOnPipelineCreation, + pipelineConfigurations + ); + } + + /** + * Check if a processor is a pipeline processor containing at least a geoip processor. + * @param processor Processor config. + * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. + * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration + * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor. + */ + @SuppressWarnings("unchecked") + private static boolean isPipelineProcessorWithGeoIpProcessor( + Map processor, + boolean downloadDatabaseOnPipelineCreation, + Map pipelineConfigurations + ) { + final Map processorConfig = (Map) processor.get("pipeline"); + return processorConfig != null + && hasAtLeastOneGeoipProcessor( + (List>) pipelineConfigurations.get((String) processorConfig.get("name")) + .getConfig() + .get(Pipeline.PROCESSORS_KEY), + downloadDatabaseOnPipelineCreation, + pipelineConfigurations + ); } // starts GeoIP downloader task for a single project diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java index eccc29d22277f..1dfacc7861825 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java @@ -52,6 +52,53 @@ public void testHasAtLeastOneGeoipProcessorWhenDownloadDatabaseOnPipelineCreatio } + /* + * This tests that if a default or final pipeline has a pipeline processor that has a geoip processor that has + * download_database_on_pipeline_creation set to false, then we will correctly acknowledge that the pipeline has a geoip processor so + * that we download it appropriately. + */ + public void testHasAtLeastOneGeoipProcessorInPipelineProcessorWhenDownloadDatabaseOnPipelineCreationIsFalse() throws IOException { + String innerInnerPipelineJson = """ + { + "processors":[""" + getGeoIpProcessor(false) + """ + ] + } + """; + String innerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerInnerPipeline"}} + ] + } + """; + String outerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerPipeline"}} + ] + } + """; + IngestMetadata ingestMetadata = new IngestMetadata( + Map.of( + "innerInnerPipeline", + new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON), + "innerPipeline", + new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON), + "outerPipeline", + new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON) + ) + ); + // The pipeline is not used in any index, expected to return false. + var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as default pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as final pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + } + public void testHasAtLeastOneGeoipProcessor() throws IOException { var projectId = Metadata.DEFAULT_PROJECT_ID; List expectHitsInputs = getPipelinesWithGeoIpProcessors(true); @@ -207,6 +254,114 @@ private List getPipelinesWithGeoIpProcessors(boolean downloadDatabaseOnP ); } + private List getPipelinesWithGeoIpProcessorsInsidePipelineProcessors(boolean downloadDatabaseOnPipelineCreation) + throws IOException { + String simpleGeoIpProcessor = """ + { + "processors":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ + ] + } + """; + String onFailureWithGeoIpProcessor = """ + { + "processors":[ + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ + ] + } + } + ] + } + """; + String foreachWithGeoIpProcessor = """ + { + "processors":[ + { + "foreach":{ + "field":"values", + "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ + } + } + ] + } + """; + String nestedForeachWithGeoIpProcessor = """ + { + "processors":[ + { + "foreach":{ + "field":"values", + "processor": + { + "foreach":{ + "field":"someField", + "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ + } + } + } + } + ] + } + """; + String nestedForeachWithOnFailureWithGeoIpProcessor = """ + { + "processors":[ + { + "foreach":{ + "field":"values", + "processor": + { + "foreach":{ + "field":"someField", + "processor": + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ + ] + } + } + } + } + } + } + ] + } + """; + String onFailureWithForeachWithGeoIp = """ + { + "processors":[ + { + "rename":{ + "field":"provider", + "target_field":"cloud.provider", + "on_failure":[ + { + "foreach":{ + "field":"values", + "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ + } + } + ] + } + } + ] + } + """; + return List.of( + simpleGeoIpProcessor, + onFailureWithGeoIpProcessor, + foreachWithGeoIpProcessor, + nestedForeachWithGeoIpProcessor, + nestedForeachWithOnFailureWithGeoIpProcessor, + onFailureWithForeachWithGeoIp + ); + } + /* * This method returns an assorted list of pipelines that _do not_ have geoip processors -- ones that ought to cause * hasAtLeastOneGeoipProcessor to return false. From 647e0491bfeb2317dee7fb5fd294b5756d83e47f Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 14 Jul 2025 14:19:13 -0500 Subject: [PATCH 2/9] Update docs/changelog/131236.yaml --- docs/changelog/131236.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/131236.yaml diff --git a/docs/changelog/131236.yaml b/docs/changelog/131236.yaml new file mode 100644 index 0000000000000..13e3190098404 --- /dev/null +++ b/docs/changelog/131236.yaml @@ -0,0 +1,6 @@ +pr: 131236 +summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline + processor within a default or final pipeline +area: Ingest Node +type: bug +issues: [] From 9b905808bed10cc34bc3763daea1f55b089a802d Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 14 Jul 2025 14:29:56 -0500 Subject: [PATCH 3/9] copilot-suggested cleanup --- .../geoip/GeoIpDownloaderTaskExecutor.java | 62 +++++++++++-------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 463d568f39bd2..8c8ffb5ec1c15 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -54,6 +54,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; @@ -298,13 +299,13 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { @SuppressWarnings("unchecked") private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) { List configurations = IngestService.getPipelines(projectMetadata); - Map pipelineConfigurations = configurations.stream() - .collect(Collectors.toMap(PipelineConfiguration::getId, pipelineConfiguration -> pipelineConfiguration)); + Map pipelineConfigById = configurations.stream() + .collect(Collectors.toMap(PipelineConfiguration::getId, Function.identity())); Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); - if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation, pipelineConfigurations)) { + if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation, pipelineConfigById)) { ids.add(configuration.getId()); } } @@ -315,18 +316,18 @@ private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMe * Check if a list of processor contains at least a geoip processor. * @param processors List of processors. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. - * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ private static boolean hasAtLeastOneGeoipProcessor( List> processors, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigurations + Map pipelineConfigById ) { if (processors != null) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Map processor : processors) { - if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations)) { + if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById)) { return true; } } @@ -338,14 +339,14 @@ private static boolean hasAtLeastOneGeoipProcessor( * Check if a processor config is a geoip processor or contains at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. - * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean hasAtLeastOneGeoipProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigurations + Map pipelineConfigById ) { if (processor == null) { return false; @@ -365,23 +366,23 @@ private static boolean hasAtLeastOneGeoipProcessor( } } - return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations) - || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations) - || isPipelineProcessorWithGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigurations); + return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById) + || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById) + || isPipelineProcessorWithGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById); } /** * Check if a processor config has an on_failure clause containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. - * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean isProcessorWithOnFailureGeoIpProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigurations + Map pipelineConfigById ) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Object value : processor.values()) { @@ -389,7 +390,7 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor( && hasAtLeastOneGeoipProcessor( ((Map>>) value).get("on_failure"), downloadDatabaseOnPipelineCreation, - pipelineConfigurations + pipelineConfigById )) { return true; } @@ -401,21 +402,21 @@ && hasAtLeastOneGeoipProcessor( * Check if a processor is a foreach processor containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. - * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean isForeachProcessorWithGeoipProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigurations + Map pipelineConfigById ) { final Map processorConfig = (Map) processor.get("foreach"); return processorConfig != null && hasAtLeastOneGeoipProcessor( (Map) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation, - pipelineConfigurations + pipelineConfigById ); } @@ -423,24 +424,31 @@ && hasAtLeastOneGeoipProcessor( * Check if a processor is a pipeline processor containing at least a geoip processor. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. - * @param pipelineConfigurations A Map of pipeline id to PipelineConfiguration + * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor. */ @SuppressWarnings("unchecked") private static boolean isPipelineProcessorWithGeoIpProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigurations + Map pipelineConfigById ) { final Map processorConfig = (Map) processor.get("pipeline"); - return processorConfig != null - && hasAtLeastOneGeoipProcessor( - (List>) pipelineConfigurations.get((String) processorConfig.get("name")) - .getConfig() - .get(Pipeline.PROCESSORS_KEY), - downloadDatabaseOnPipelineCreation, - pipelineConfigurations - ); + if (processorConfig != null) { + String pipelineName = (String) processorConfig.get("name"); + if (pipelineName != null) { + PipelineConfiguration pipelineConfiguration = pipelineConfigById.get(pipelineName); + if (pipelineConfiguration != null) { + List> childProcessors = (List>) pipelineConfiguration.getConfig() + .get(Pipeline.PROCESSORS_KEY); + if (childProcessors != null && childProcessors.isEmpty() == false) { + return hasAtLeastOneGeoipProcessor(childProcessors, downloadDatabaseOnPipelineCreation, pipelineConfigById); + } + } + } + + } + return false; } // starts GeoIP downloader task for a single project From e01276b8a9ab252b4fdbf8bfb98e72ecd940d625 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 14 Jul 2025 16:35:32 -0500 Subject: [PATCH 4/9] handling cycles and improving performance --- .../geoip/GeoIpDownloaderTaskExecutor.java | 128 +++++++++++++++--- .../GeoIpDownloaderTaskExecutorTests.java | 48 +++++++ 2 files changed, 155 insertions(+), 21 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index 8c8ffb5ec1c15..a3712e1a2c5a3 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -47,6 +47,7 @@ import org.elasticsearch.transport.RemoteTransportException; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -301,12 +302,23 @@ private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMe List configurations = IngestService.getPipelines(projectMetadata); Map pipelineConfigById = configurations.stream() .collect(Collectors.toMap(PipelineConfiguration::getId, Function.identity())); + Map pipelineHasGeoProcessorById = new HashMap<>(); // used to keep track of pipelines we've checked before Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); - if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation, pipelineConfigById)) { - ids.add(configuration.getId()); + String pipelineName = configuration.getId(); + if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) { + // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: + pipelineHasGeoProcessorById.put(pipelineName, null); + if (hasAtLeastOneGeoipProcessor( + processors, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + )) { + ids.add(pipelineName); + } } } return Collections.unmodifiableSet(ids); @@ -317,17 +329,26 @@ private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMe * @param processors List of processors. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ private static boolean hasAtLeastOneGeoipProcessor( List> processors, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigById + Map pipelineConfigById, + Map pipelineHasGeoProcessorById ) { if (processors != null) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Map processor : processors) { - if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById)) { + if (hasAtLeastOneGeoipProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + )) { return true; } } @@ -340,13 +361,17 @@ private static boolean hasAtLeastOneGeoipProcessor( * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean hasAtLeastOneGeoipProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigById + Map pipelineConfigById, + Map pipelineHasGeoProcessorById ) { if (processor == null) { return false; @@ -366,9 +391,24 @@ private static boolean hasAtLeastOneGeoipProcessor( } } - return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById) - || isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById) - || isPipelineProcessorWithGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation, pipelineConfigById); + return isProcessorWithOnFailureGeoIpProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ) + || isForeachProcessorWithGeoipProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ) + || isPipelineProcessorWithGeoIpProcessor( + processor, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ); } /** @@ -376,13 +416,17 @@ private static boolean hasAtLeastOneGeoipProcessor( * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean isProcessorWithOnFailureGeoIpProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigById + Map pipelineConfigById, + Map pipelineHasGeoProcessorById ) { // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (Object value : processor.values()) { @@ -390,7 +434,8 @@ private static boolean isProcessorWithOnFailureGeoIpProcessor( && hasAtLeastOneGeoipProcessor( ((Map>>) value).get("on_failure"), downloadDatabaseOnPipelineCreation, - pipelineConfigById + pipelineConfigById, + pipelineHasGeoProcessorById )) { return true; } @@ -403,50 +448,91 @@ && hasAtLeastOneGeoipProcessor( * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processor list. */ @SuppressWarnings("unchecked") private static boolean isForeachProcessorWithGeoipProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigById + Map pipelineConfigById, + Map pipelineHasGeoProcessorById ) { final Map processorConfig = (Map) processor.get("foreach"); return processorConfig != null && hasAtLeastOneGeoipProcessor( (Map) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation, - pipelineConfigById + pipelineConfigById, + pipelineHasGeoProcessorById ); } /** - * Check if a processor is a pipeline processor containing at least a geoip processor. + * Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates + * pipelineHasGeoProcessorById with a result for any pipelines it looks at. * @param processor Processor config. * @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false. * @param pipelineConfigById A Map of pipeline id to PipelineConfiguration + * @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor + * (true), does not reference a geoip processor (false), or we are currently trying to figure that + * out (null). * @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor. */ @SuppressWarnings("unchecked") private static boolean isPipelineProcessorWithGeoIpProcessor( Map processor, boolean downloadDatabaseOnPipelineCreation, - Map pipelineConfigById + Map pipelineConfigById, + Map pipelineHasGeoProcessorById ) { final Map processorConfig = (Map) processor.get("pipeline"); if (processorConfig != null) { String pipelineName = (String) processorConfig.get("name"); if (pipelineName != null) { - PipelineConfiguration pipelineConfiguration = pipelineConfigById.get(pipelineName); - if (pipelineConfiguration != null) { - List> childProcessors = (List>) pipelineConfiguration.getConfig() - .get(Pipeline.PROCESSORS_KEY); - if (childProcessors != null && childProcessors.isEmpty() == false) { - return hasAtLeastOneGeoipProcessor(childProcessors, downloadDatabaseOnPipelineCreation, pipelineConfigById); + if (pipelineHasGeoProcessorById.containsKey(pipelineName)) { + Boolean hasGeoProcessor = pipelineHasGeoProcessorById.get(pipelineName); + if (hasGeoProcessor == null) { + /* + * If the value is null here, it indicates that this method has been called recursively with the same pipeline name. + * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at + * server startup time. Instead, we just log the problem and bail out as quickly as possible. It is possible that + * this could lead to a geo database not being downloaded for the pipeline, but it doesn't really matter since the + * pipeline was going to fail anyway. + */ + logger.warn("Detected that pipeline [" + pipelineName + "] is called recursively."); + pipelineHasGeoProcessorById.put(pipelineName, false); + return false; + } else { + return hasGeoProcessor; } + } else { + PipelineConfiguration pipelineConfiguration = pipelineConfigById.get(pipelineName); + final boolean pipelineHasGeoProcessor; + if (pipelineConfiguration != null) { + List> childProcessors = (List>) pipelineConfiguration.getConfig() + .get(Pipeline.PROCESSORS_KEY); + if (childProcessors != null && childProcessors.isEmpty() == false) { + // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: + pipelineHasGeoProcessorById.put(pipelineName, null); + pipelineHasGeoProcessor = hasAtLeastOneGeoipProcessor( + childProcessors, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ); + } else { + pipelineHasGeoProcessor = false; + } + } else { + pipelineHasGeoProcessor = false; + } + pipelineHasGeoProcessorById.put(pipelineName, pipelineHasGeoProcessor); } + return pipelineHasGeoProcessorById.get(pipelineName); } - } return false; } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java index 1dfacc7861825..ec5e3d72ec1be 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java @@ -99,6 +99,54 @@ public void testHasAtLeastOneGeoipProcessorInPipelineProcessorWhenDownloadDataba assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); } + public void testHasAtLeastOneGeoipProcessorRecursion() throws IOException { + /* + * The pipeline in this test is invalid -- it has a cycle from outerPipeline -> innerPipeline -> innerInnerPipeline -> + * innerPipeline. Since this method is called at server startup, we want to make sure that we don't get a StackOverFlowError and + * that we don't throw any kind of validation exception (since that would be an unexpected change of behavior). + */ + String innerInnerPipelineJson = """ + { + "processors":[""" + getGeoIpProcessor(false) + """ + , {"pipeline": {"name": "innerPipeline"}} + ] + } + """; + String innerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerInnerPipeline"}} + ] + } + """; + String outerPipelineJson = """ + { + "processors":[{"pipeline": {"name": "innerPipeline"}} + ] + } + """; + IngestMetadata ingestMetadata = new IngestMetadata( + Map.of( + "innerInnerPipeline", + new PipelineConfiguration("innerInnerPipeline", new BytesArray(innerInnerPipelineJson), XContentType.JSON), + "innerPipeline", + new PipelineConfiguration("innerPipeline", new BytesArray(innerPipelineJson), XContentType.JSON), + "outerPipeline", + new PipelineConfiguration("outerPipeline", new BytesArray(outerPipelineJson), XContentType.JSON) + ) + ); + // The pipeline is not used in any index, expected to return false. + var projectMetadata = projectMetadataWithIndex(b -> {}, ingestMetadata); + assertFalse(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as default pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.DEFAULT_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + + // The pipeline is set as final pipeline in an index, expected to return true. + projectMetadata = projectMetadataWithIndex(b -> b.put(IndexSettings.FINAL_PIPELINE.getKey(), "outerPipeline"), ingestMetadata); + assertTrue(GeoIpDownloaderTaskExecutor.hasAtLeastOneGeoipProcessor(projectMetadata)); + } + public void testHasAtLeastOneGeoipProcessor() throws IOException { var projectId = Metadata.DEFAULT_PROJECT_ID; List expectHitsInputs = getPipelinesWithGeoIpProcessors(true); From 51a826134921309c194ce4f1fa48b9f9bde4b83b Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 14 Jul 2025 17:18:48 -0500 Subject: [PATCH 5/9] more cleanup --- .../geoip/GeoIpDownloaderTaskExecutor.java | 48 ++++++++----------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index a3712e1a2c5a3..bb1aff44709ba 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -309,8 +310,6 @@ private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMe List> processors = (List>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY); String pipelineName = configuration.getId(); if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) { - // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: - pipelineHasGeoProcessorById.put(pipelineName, null); if (hasAtLeastOneGeoipProcessor( processors, downloadDatabaseOnPipelineCreation, @@ -493,8 +492,7 @@ private static boolean isPipelineProcessorWithGeoIpProcessor( String pipelineName = (String) processorConfig.get("name"); if (pipelineName != null) { if (pipelineHasGeoProcessorById.containsKey(pipelineName)) { - Boolean hasGeoProcessor = pipelineHasGeoProcessorById.get(pipelineName); - if (hasGeoProcessor == null) { + if (pipelineHasGeoProcessorById.get(pipelineName) == null) { /* * If the value is null here, it indicates that this method has been called recursively with the same pipeline name. * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at @@ -502,34 +500,26 @@ private static boolean isPipelineProcessorWithGeoIpProcessor( * this could lead to a geo database not being downloaded for the pipeline, but it doesn't really matter since the * pipeline was going to fail anyway. */ - logger.warn("Detected that pipeline [" + pipelineName + "] is called recursively."); + logger.warn("Detected that pipeline [{}] is called recursively.", pipelineName); pipelineHasGeoProcessorById.put(pipelineName, false); - return false; - } else { - return hasGeoProcessor; } } else { - PipelineConfiguration pipelineConfiguration = pipelineConfigById.get(pipelineName); - final boolean pipelineHasGeoProcessor; - if (pipelineConfiguration != null) { - List> childProcessors = (List>) pipelineConfiguration.getConfig() - .get(Pipeline.PROCESSORS_KEY); - if (childProcessors != null && childProcessors.isEmpty() == false) { - // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: - pipelineHasGeoProcessorById.put(pipelineName, null); - pipelineHasGeoProcessor = hasAtLeastOneGeoipProcessor( - childProcessors, - downloadDatabaseOnPipelineCreation, - pipelineConfigById, - pipelineHasGeoProcessorById - ); - } else { - pipelineHasGeoProcessor = false; - } - } else { - pipelineHasGeoProcessor = false; - } - pipelineHasGeoProcessorById.put(pipelineName, pipelineHasGeoProcessor); + List> childProcessors = Optional.ofNullable((String) processorConfig.get("name")) + .map(pipelineConfigById::get) + .map(PipelineConfiguration::getConfig) + .map(config -> (List>) config.get(Pipeline.PROCESSORS_KEY)) + .orElse(Collections.emptyList()); + // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: + pipelineHasGeoProcessorById.put(pipelineName, null); + pipelineHasGeoProcessorById.put( + pipelineName, + hasAtLeastOneGeoipProcessor( + childProcessors, + downloadDatabaseOnPipelineCreation, + pipelineConfigById, + pipelineHasGeoProcessorById + ) + ); } return pipelineHasGeoProcessorById.get(pipelineName); } From 2cd20baf920eaa14a07cb72b52f405b525a97e34 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 14 Jul 2025 17:20:47 -0500 Subject: [PATCH 6/9] removing unused test code --- .../GeoIpDownloaderTaskExecutorTests.java | 108 ------------------ 1 file changed, 108 deletions(-) diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java index ec5e3d72ec1be..09eb7fb65585d 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutorTests.java @@ -302,114 +302,6 @@ private List getPipelinesWithGeoIpProcessors(boolean downloadDatabaseOnP ); } - private List getPipelinesWithGeoIpProcessorsInsidePipelineProcessors(boolean downloadDatabaseOnPipelineCreation) - throws IOException { - String simpleGeoIpProcessor = """ - { - "processors":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ - ] - } - """; - String onFailureWithGeoIpProcessor = """ - { - "processors":[ - { - "rename":{ - "field":"provider", - "target_field":"cloud.provider", - "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ - ] - } - } - ] - } - """; - String foreachWithGeoIpProcessor = """ - { - "processors":[ - { - "foreach":{ - "field":"values", - "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ - } - } - ] - } - """; - String nestedForeachWithGeoIpProcessor = """ - { - "processors":[ - { - "foreach":{ - "field":"values", - "processor": - { - "foreach":{ - "field":"someField", - "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ - } - } - } - } - ] - } - """; - String nestedForeachWithOnFailureWithGeoIpProcessor = """ - { - "processors":[ - { - "foreach":{ - "field":"values", - "processor": - { - "foreach":{ - "field":"someField", - "processor": - { - "rename":{ - "field":"provider", - "target_field":"cloud.provider", - "on_failure":[""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ - ] - } - } - } - } - } - } - ] - } - """; - String onFailureWithForeachWithGeoIp = """ - { - "processors":[ - { - "rename":{ - "field":"provider", - "target_field":"cloud.provider", - "on_failure":[ - { - "foreach":{ - "field":"values", - "processor":""" + getGeoIpProcessor(downloadDatabaseOnPipelineCreation) + """ - } - } - ] - } - } - ] - } - """; - return List.of( - simpleGeoIpProcessor, - onFailureWithGeoIpProcessor, - foreachWithGeoIpProcessor, - nestedForeachWithGeoIpProcessor, - nestedForeachWithOnFailureWithGeoIpProcessor, - onFailureWithForeachWithGeoIp - ); - } - /* * This method returns an assorted list of pipelines that _do not_ have geoip processors -- ones that ought to cause * hasAtLeastOneGeoipProcessor to return false. From ff4f3839691486e631b212bb6098044c9d51f1a7 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 15 Jul 2025 16:11:40 -0500 Subject: [PATCH 7/9] copilot-suggested cleanup --- .../elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index bb1aff44709ba..f49f44cf37667 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -504,7 +504,7 @@ private static boolean isPipelineProcessorWithGeoIpProcessor( pipelineHasGeoProcessorById.put(pipelineName, false); } } else { - List> childProcessors = Optional.ofNullable((String) processorConfig.get("name")) + List> childProcessors = Optional.ofNullable(pipelineName) .map(pipelineConfigById::get) .map(PipelineConfiguration::getConfig) .map(config -> (List>) config.get(Pipeline.PROCESSORS_KEY)) From 8aa4f7c3aac679e49922e8d0e7a95de9da506767 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 16 Jul 2025 17:41:01 -0400 Subject: [PATCH 8/9] Unroll some loops --- .../geoip/GeoIpDownloaderTaskExecutor.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index f49f44cf37667..b697749841e01 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -52,12 +53,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.stream.Collectors; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER; @@ -284,11 +282,14 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { return false; } - return projectMetadata.indices().values().stream().anyMatch(indexMetadata -> { + for (IndexMetadata indexMetadata : projectMetadata.indices().values()) { String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings()); String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings()); - return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline); - }); + if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) { + return true; + } + } + return false; } /** @@ -301,9 +302,12 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) { @SuppressWarnings("unchecked") private static Set pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) { List configurations = IngestService.getPipelines(projectMetadata); - Map pipelineConfigById = configurations.stream() - .collect(Collectors.toMap(PipelineConfiguration::getId, Function.identity())); - Map pipelineHasGeoProcessorById = new HashMap<>(); // used to keep track of pipelines we've checked before + Map pipelineConfigById = HashMap.newHashMap(configurations.size()); + for (PipelineConfiguration configuration : configurations) { + pipelineConfigById.put(configuration.getId(), configuration); + } + // this map is used to keep track of pipelines that have already been checked + Map pipelineHasGeoProcessorById = HashMap.newHashMap(configurations.size()); Set ids = new HashSet<>(); // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph for (PipelineConfiguration configuration : configurations) { @@ -504,11 +508,11 @@ private static boolean isPipelineProcessorWithGeoIpProcessor( pipelineHasGeoProcessorById.put(pipelineName, false); } } else { - List> childProcessors = Optional.ofNullable(pipelineName) - .map(pipelineConfigById::get) - .map(PipelineConfiguration::getConfig) - .map(config -> (List>) config.get(Pipeline.PROCESSORS_KEY)) - .orElse(Collections.emptyList()); + List> childProcessors = null; + PipelineConfiguration config = pipelineConfigById.get(pipelineName); + if (config != null) { + childProcessors = (List>) config.getConfig().get(Pipeline.PROCESSORS_KEY); + } // We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors: pipelineHasGeoProcessorById.put(pipelineName, null); pipelineHasGeoProcessorById.put( From 444dfd1e53a8b03daccecb6f07f56c5723bdc9a1 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 18 Jul 2025 16:19:10 -0500 Subject: [PATCH 9/9] removing verbose log message --- .../ingest/geoip/GeoIpDownloaderTaskExecutor.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java index b697749841e01..6d55e94b0a23d 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java @@ -500,11 +500,10 @@ private static boolean isPipelineProcessorWithGeoIpProcessor( /* * If the value is null here, it indicates that this method has been called recursively with the same pipeline name. * This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at - * server startup time. Instead, we just log the problem and bail out as quickly as possible. It is possible that - * this could lead to a geo database not being downloaded for the pipeline, but it doesn't really matter since the - * pipeline was going to fail anyway. + * server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a + * geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to + * fail anyway. */ - logger.warn("Detected that pipeline [{}] is called recursively.", pipelineName); pipelineHasGeoProcessorById.put(pipelineName, false); } } else {