Skip to content

Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline #131236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131236.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131236
summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline
processor within a default or final pipeline
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -47,6 +48,7 @@
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -280,11 +282,14 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
return false;
}

return projectMetadata.indices().values().stream().anyMatch(indexMetadata -> {
for (IndexMetadata indexMetadata : projectMetadata.indices().values()) {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
});
if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) {
return true;
}
}
return false;
}

/**
Expand All @@ -297,12 +302,26 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
@SuppressWarnings("unchecked")
private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) {
List<PipelineConfiguration> configurations = IngestService.getPipelines(projectMetadata);
Map<String, PipelineConfiguration> pipelineConfigById = HashMap.newHashMap(configurations.size());
for (PipelineConfiguration configuration : configurations) {
pipelineConfigById.put(configuration.getId(), configuration);
}
// this map is used to keep track of pipelines that have already been checked
Map<String, Boolean> pipelineHasGeoProcessorById = HashMap.newHashMap(configurations.size());
Set<String> ids = new HashSet<>();
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (PipelineConfiguration configuration : configurations) {
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
ids.add(configuration.getId());
String pipelineName = configuration.getId();
if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) {
if (hasAtLeastOneGeoipProcessor(
processors,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)) {
ids.add(pipelineName);
}
}
}
return Collections.unmodifiableSet(ids);
Expand All @@ -312,13 +331,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMe
* Check if a list of processor contains at least a geoip processor.
* @param processors List of processors.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
private static boolean hasAtLeastOneGeoipProcessor(
List<Map<String, Object>> processors,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
if (processors != null) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Map<String, Object> processor : processors) {
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
if (hasAtLeastOneGeoipProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)) {
return true;
}
}
Expand All @@ -330,10 +363,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
* Check if a processor config is a geoip processor or contains at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
private static boolean hasAtLeastOneGeoipProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
if (processor == null) {
return false;
}
Expand All @@ -352,27 +394,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
}
}

return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
return isProcessorWithOnFailureGeoIpProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)
|| isForeachProcessorWithGeoipProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)
|| isPipelineProcessorWithGeoIpProcessor(
processor,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
);
}

/**
* Check if a processor config has an on_failure clause containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean isProcessorWithOnFailureGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
for (Object value : processor.values()) {
if (value instanceof Map
&& hasAtLeastOneGeoipProcessor(
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
downloadDatabaseOnPipelineCreation
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)) {
return true;
}
Expand All @@ -384,13 +450,84 @@ && hasAtLeastOneGeoipProcessor(
* Check if a processor is a foreach processor containing at least a geoip processor.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processor list.
*/
@SuppressWarnings("unchecked")
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
private static boolean isForeachProcessorWithGeoipProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
return processorConfig != null
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
&& hasAtLeastOneGeoipProcessor(
(Map<String, Object>) processorConfig.get("processor"),
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
);
}

/**
* Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
* pipelineHasGeoProcessorById with a result for any pipelines it looks at.
* @param processor Processor config.
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
* out (null).
* @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
*/
@SuppressWarnings("unchecked")
private static boolean isPipelineProcessorWithGeoIpProcessor(
Map<String, Object> processor,
boolean downloadDatabaseOnPipelineCreation,
Map<String, PipelineConfiguration> pipelineConfigById,
Map<String, Boolean> pipelineHasGeoProcessorById
) {
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("pipeline");
if (processorConfig != null) {
String pipelineName = (String) processorConfig.get("name");
if (pipelineName != null) {
if (pipelineHasGeoProcessorById.containsKey(pipelineName)) {
if (pipelineHasGeoProcessorById.get(pipelineName) == null) {
/*
* If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
* This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
* server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
* geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
* fail anyway.
*/
pipelineHasGeoProcessorById.put(pipelineName, false);
}
} else {
List<Map<String, Object>> childProcessors = null;
PipelineConfiguration config = pipelineConfigById.get(pipelineName);
if (config != null) {
childProcessors = (List<Map<String, Object>>) config.getConfig().get(Pipeline.PROCESSORS_KEY);
}
// We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
pipelineHasGeoProcessorById.put(pipelineName, null);
pipelineHasGeoProcessorById.put(
pipelineName,
hasAtLeastOneGeoipProcessor(
childProcessors,
downloadDatabaseOnPipelineCreation,
pipelineConfigById,
pipelineHasGeoProcessorById
)
);
}
return pipelineHasGeoProcessorById.get(pipelineName);
}
}
return false;
}

// starts GeoIP downloader task for a single project
Expand Down
Loading
Loading