Skip to content

feat:support config empty protection. #601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,18 @@ public interface ConnectorConfig extends ServerConnectorConfig {
* @return 连接器类型
*/
String getConnectorType();

/**
* 是否开启推空保护
*
* @return 是否开启推空保护
*/
Boolean isEmptyProtectionEnable();

/**
* 推空保护过期时间,单位毫秒
*
* @return 推空保护过期时间
*/
Long getEmptyProtectionExpiredInterval();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ public class ConnectorConfigImpl extends ServerConnectorConfigImpl implements Co
@JsonProperty
private Integer configFileGroupThreadNum = 10;

@JsonProperty
private Boolean emptyProtectionEnable = true;

@JsonProperty
@JsonDeserialize(using = TimeStrJsonDeserializer.class)
private Long emptyProtectionExpiredInterval = 7 * 24 * 3600 * 1000L;

@Override
public void verify() {
ConfigUtils.validateString(connectorType, "configConnectorType");
Expand Down Expand Up @@ -91,6 +98,12 @@ public void setDefault(Object defaultObject) {
if (connectorType == null) {
this.connectorType = connectorConfig.getConnectorType();
}
if (emptyProtectionEnable == null) {
this.emptyProtectionEnable = connectorConfig.isEmptyProtectionEnable();
}
if (emptyProtectionExpiredInterval == null) {
this.emptyProtectionExpiredInterval = connectorConfig.getEmptyProtectionExpiredInterval();
}
}
}

Expand Down Expand Up @@ -159,4 +172,21 @@ public void setConfigFileGroupThreadNum(Integer configFileGroupThreadNum) {
this.configFileGroupThreadNum = configFileGroupThreadNum;
}

@Override
public Boolean isEmptyProtectionEnable() {
return emptyProtectionEnable;
}

public void setEmptyProtectionEnable(Boolean emptyProtectionEnable) {
this.emptyProtectionEnable = emptyProtectionEnable;
}

@Override
public Long getEmptyProtectionExpiredInterval() {
return emptyProtectionExpiredInterval;
}

public void setEmptyProtectionExpiredInterval(Long emptyProtectionExpiredInterval) {
this.emptyProtectionExpiredInterval = emptyProtectionExpiredInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.tencent.polaris.configuration.client.internal;

import com.tencent.polaris.annonation.JustForTest;
import com.tencent.polaris.api.control.Destroyable;
import com.tencent.polaris.api.exception.ServerCodes;
import com.tencent.polaris.api.plugin.configuration.ConfigFile;
Expand All @@ -27,11 +28,11 @@
import com.tencent.polaris.client.api.SDKContext;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.configuration.api.core.ConfigFileMetadata;
import com.tencent.polaris.configuration.client.util.ConfigFileUtils;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -47,6 +48,8 @@ public class RemoteConfigFileRepo extends AbstractConfigFileRepo {

private static ScheduledExecutorService pullExecutorService;

private static Set<String> configFileInitSet = new HashSet<>();

private final AtomicReference<ConfigFile> remoteConfigFile;
//服务端通知的版本号,此版本号有可能落后于服务端
private final AtomicLong notifiedVersion;
Expand All @@ -58,6 +61,17 @@ public class RemoteConfigFileRepo extends AbstractConfigFileRepo {

private String token;

private final boolean emptyProtection;

private final long emptyProtectionExpiredInterval;

/**
* 淘汰线程
*/
private final ScheduledExecutorService emptyProtectionExpireExecutor;

private ScheduledFuture<?> emptyProtectionExpireFuture;

static {
createPullExecutorService();
}
Expand All @@ -80,6 +94,9 @@ public RemoteConfigFileRepo(SDKContext sdkContext,
//获取远程调用插件实现类
this.configFileConnector = connector;
this.fallbackToLocalCache = sdkContext.getConfig().getConfigFile().getServerConnector().getFallbackToLocalCache();
this.emptyProtection = sdkContext.getConfig().getConfigFile().getServerConnector().isEmptyProtectionEnable();
this.emptyProtectionExpiredInterval = sdkContext.getConfig().getConfigFile().getServerConnector().getEmptyProtectionExpiredInterval();
this.emptyProtectionExpireExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("polaris-config-empty-protection"));
//注册 destroy hook
registerRepoDestroyHook(sdkContext);
//同步从远程仓库拉取一次
Expand Down Expand Up @@ -163,32 +180,51 @@ protected void doPull() {
} else {
shouldUpdateLocalCache = remoteConfigFile.get() == null || pulledConfigFile.getVersion() != remoteConfigFile.get().getVersion();
}
if (shouldUpdateLocalCache) {
// 构造更新回调动作
Runnable runnable = () -> {
ConfigFile copiedConfigFile = deepCloneConfigFile(pulledConfigFile);
remoteConfigFile.set(copiedConfigFile);
//配置有更新,触发回调
fireChangeEvent(copiedConfigFile);

// update local file cache
this.configFilePersistHandler.asyncSaveConfigFile(pulledConfigFile);
};
if (shouldUpdateLocalCache && checkEmptyProtect(response)) {
shouldUpdateLocalCache = false;
fallbackIfNecessaryWhenStartingUp(pullConfigFileReq);
submitEmptyProtectionExpireTask(runnable);
}
if (shouldUpdateLocalCache) {
runnable.run();
cancelEmptyProtectionExpireTask();
}
return;
}

//远端没有此配置文件
if (response.getCode() == ServerCodes.NOT_FOUND_RESOURCE) {
LOGGER.warn("[Config] config file not found, please check whether config file released. {}",
configFileMetadata);
//delete local file cache
this.configFilePersistHandler
.asyncDeleteConfigFile(new ConfigFile(configFileMetadata.getNamespace(),
configFileMetadata.getFileGroup(), configFileMetadata.getFileName()));

//删除配置文件
if (remoteConfigFile.get() != null) {
remoteConfigFile.set(null);
//删除配置文件也需要触发通知
fireChangeEvent(null);
// 构造更新回调动作
Runnable runnable = () -> {
//delete local file cache
this.configFilePersistHandler
.asyncDeleteConfigFile(new ConfigFile(configFileMetadata.getNamespace(),
configFileMetadata.getFileGroup(), configFileMetadata.getFileName()));

//删除配置文件
if (remoteConfigFile.get() != null) {
remoteConfigFile.set(null);
//删除配置文件也需要触发通知
fireChangeEvent(null);
}
};
if (checkEmptyProtect(response)) {
fallbackIfNecessaryWhenStartingUp(pullConfigFileReq);
submitEmptyProtectionExpireTask(runnable);
} else {
LOGGER.warn("[Config] config file not found, please check whether config file released. {}",
configFileMetadata);
runnable.run();
}
return;
}
Expand All @@ -213,16 +249,39 @@ protected void doPull() {
}

private void fallbackIfNecessary(final int retryTimes, ConfigFile configFileReq) {
if (retryTimes >= PULL_CONFIG_RETRY_TIMES && fallbackToLocalCache) {
if (retryTimes >= PULL_CONFIG_RETRY_TIMES) {
LOGGER.info("[Config] failed to pull config file from remote.");
//重试次数超过上限,从本地缓存拉取
loadLocalCache(configFileReq);
}
}

private void fallbackIfNecessaryWhenStartingUp(ConfigFile configFileReq) {
String identifier = getIdentifier();
boolean initFlag = false;
if (configFileInitSet.contains(identifier)) {
initFlag = true;
} else {
configFileInitSet.add(identifier);
}
if (!initFlag) {
// 第一次启动的时候,如果拉取到空配置,则尝试从缓存中获取
LOGGER.info("[Config] load local cache because of empty config when starting up.");
loadLocalCache(configFileReq);
}
}

private void loadLocalCache(ConfigFile configFileReq) {
if (fallbackToLocalCache) {
ConfigFile configFileRes = configFilePersistHandler.loadPersistedConfigFile(configFileReq);
if (configFileRes != null) {
LOGGER.info("[Config] failed to pull config file from remote,fallback to local cache success.{}.", configFileRes);
LOGGER.info("[Config] load local cache success.{}.", configFileRes);
remoteConfigFile.set(configFileRes);
//配置有更新,触发回调
fireChangeEvent(configFileRes);
return;
}
LOGGER.info("[Config] failed to pull config file from remote,fallback to local cache fail.{}.", configFileReq);
LOGGER.info("[Config] load local cache fail.{}.", configFileReq);
}
}

Expand Down Expand Up @@ -293,4 +352,39 @@ protected void doDestroy() {
static void destroyPullExecutor() {
ThreadPoolUtils.waitAndStopThreadPools(new ExecutorService[]{pullExecutorService});
}

/**
* 若配置为空,则推空保护开启,则不刷新配置
*
* @param configFileResponse
* @return
*/
private boolean checkEmptyProtect(ConfigFileResponse configFileResponse) {
if (emptyProtection && ConfigFileUtils.checkConfigContentEmpty(configFileResponse)) {
LOGGER.warn("Empty response from remote with {}, will not refresh config.", getIdentifier());
return true;
}
return false;
}

private void submitEmptyProtectionExpireTask(Runnable runnable) {
if (emptyProtectionExpireFuture == null || emptyProtectionExpireFuture.isCancelled() || emptyProtectionExpireFuture.isDone()) {
LOGGER.info("Empty protection expire task of {} submit.", getIdentifier());
emptyProtectionExpireFuture = emptyProtectionExpireExecutor.schedule(runnable, emptyProtectionExpiredInterval, TimeUnit.MILLISECONDS);
}
}

private void cancelEmptyProtectionExpireTask() {
if (emptyProtectionExpireFuture != null && !emptyProtectionExpireFuture.isCancelled() && !emptyProtectionExpireFuture.isDone()) {
emptyProtectionExpireFuture.cancel(true);
LOGGER.info("Empty protection expire task of {} cancel.", getIdentifier());
}
}

@JustForTest
String getIdentifier() {
return configFileMetadata.getNamespace() + "."
+ configFileMetadata.getFileGroup() + "."
+ configFileMetadata.getFileName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package com.tencent.polaris.configuration.client.util;

import com.google.common.collect.Maps;
import com.tencent.polaris.api.exception.ServerCodes;
import com.tencent.polaris.api.plugin.configuration.ConfigFile;
import com.tencent.polaris.api.plugin.configuration.ConfigFileResponse;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.configuration.api.core.ConfigFileMetadata;
import com.tencent.polaris.logging.LoggerFactory;
import org.slf4j.Logger;

import java.util.Collections;
import java.util.Map;
Expand All @@ -31,6 +36,8 @@
*/
public class ConfigFileUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigFileUtils.class);

public static void checkConfigFileMetadata(ConfigFileMetadata configFileMetadata) {
if (StringUtils.isBlank(configFileMetadata.getNamespace())) {
throw new IllegalArgumentException("namespace cannot be empty.");
Expand All @@ -57,4 +64,25 @@ public static Set<String> stringPropertyNames(Properties properties) {
}
return map.keySet();
}

public static boolean checkConfigContentEmpty(ConfigFileResponse configFileResponse) {
if (configFileResponse == null) {
LOGGER.debug("config file response is null.");
return true;
}
if (configFileResponse.getCode() == ServerCodes.NOT_FOUND_RESOURCE) {
LOGGER.debug("config file not found. maybe not exist or deleted.");
return true;
}
ConfigFile configFile = configFileResponse.getConfigFile();
if (configFile == null) {
LOGGER.debug("config file is null.");
return true;
}
if (StringUtils.isBlank(configFile.getContent())) {
LOGGER.debug("config file content is empty.");
return true;
}
return false;
}
}
Loading