Skip to content

Bugfix: set max_active_partitions on topic creation #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 27, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public CompletableFuture<Status> createTopic(String path, CreateTopicSettings se
if (partitioningSettings != null) {
requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder()
.setMinActivePartitions(partitioningSettings.getMinActivePartitions())
.setMaxActivePartitions(partitioningSettings.getMaxActivePartitions())
.setPartitionCountLimit(partitioningSettings.getPartitionCountLimit())
.setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder()
.setStrategy(toProto(partitioningSettings.getAutoPartitioningStrategy()))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.MethodSorters;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,7 +53,8 @@ public class YdbTopicsIntegrationTest {
@ClassRule
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();

private final static String TEST_TOPIC = "integration_test_topic";
private final static String TEST_TOPIC1 = "integration_test_topic";
private final static String TEST_TOPIC2 = "integration_test_other_topic";
private final static String TEST_CONSUMER1 = "consumer";
private final static String TEST_CONSUMER2 = "other_consumer";

Expand All @@ -70,10 +70,10 @@ public class YdbTopicsIntegrationTest {

@BeforeClass
public static void initTopic() {
logger.info("Create test topic {} ...", TEST_TOPIC);
logger.info("Create test topic {} ...", TEST_TOPIC1);

client = TopicClient.newClient(ydbTransport).build();
client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder()
client.createTopic(TEST_TOPIC1, CreateTopicSettings.newBuilder()
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build())
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER2).build())
.build()
Expand All @@ -82,16 +82,16 @@ public static void initTopic() {

@AfterClass
public static void dropTopic() {
logger.info("Drop test topic {} ...", TEST_TOPIC);
Status dropStatus = client.dropTopic(TEST_TOPIC).join();
logger.info("Drop test topic {} ...", TEST_TOPIC1);
Status dropStatus = client.dropTopic(TEST_TOPIC1).join();
client.close();
dropStatus.expectSuccess("can't drop test topic");
}

@Test
public void step01_writeWithoutDeduplication() throws InterruptedException, ExecutionException, TimeoutException {
WriterSettings settings = WriterSettings.newBuilder()
.setTopicPath(TEST_TOPIC)
.setTopicPath(TEST_TOPIC1)
.build();
SyncWriter writer = client.createSyncWriter(settings);
writer.init();
Expand All @@ -111,7 +111,7 @@ public void step01_writeWithoutDeduplication() throws InterruptedException, Exec
@Test
public void step02_readHalfWithoutCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build())
.setConsumerName(TEST_CONSUMER1)
.build();

Expand All @@ -129,7 +129,7 @@ public void step02_readHalfWithoutCommit() throws InterruptedException {
@Test
public void step03_readHalfWithCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build())
.setConsumerName(TEST_CONSUMER1)
.build();

Expand All @@ -148,7 +148,7 @@ public void step03_readHalfWithCommit() throws InterruptedException {
@Test
public void step03_readNextHalfWithoutCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build())
.setConsumerName(TEST_CONSUMER1)
.build();

Expand All @@ -170,7 +170,7 @@ public void step03_readNextHalfWithoutCommit() throws InterruptedException {
@Test
public void step04_readNextHalfWithCommit() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build())
.setConsumerName(TEST_CONSUMER1)
.build();

Expand All @@ -194,7 +194,7 @@ public void step04_readNextHalfWithCommit() throws InterruptedException {

@Test
public void step05_describeTopic() {
TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue();
TopicDescription description = client.describeTopic(TEST_TOPIC1).join().getValue();

Assert.assertNull(description.getTopicStats());
List<Consumer> consumers = description.getConsumers();
Expand All @@ -207,7 +207,7 @@ public void step05_describeTopic() {
@Test
public void step06_readAllByAsyncReader() throws InterruptedException {
ReaderSettings settings = ReaderSettings.newBuilder()
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build())
.setConsumerName(TEST_CONSUMER2)
.build();

Expand Down Expand Up @@ -244,20 +244,21 @@ public void onMessages(DataReceivedEvent dre) {
}
}

@Ignore("remove ignore once :latest YDB container tag moves onto version 25.1")
@Test
public void step07_alterTopicWithAutoPartitioning() {
client.alterTopic(TEST_TOPIC, AlterTopicSettings.newBuilder()
client.alterTopic(TEST_TOPIC1, AlterTopicSettings.newBuilder()
.setAlterPartitioningSettings(AlterPartitioningSettings.newBuilder()
.setAutoPartitioningStrategy(AutoPartitioningStrategy.SCALE_UP)
.setMaxActivePartitions(10)
.setMinActivePartitions(5)
.setWriteStrategySettings(AlterAutoPartitioningWriteStrategySettings.newBuilder()
.setStabilizationWindow(Duration.ofMinutes(1))
.setUpUtilizationPercent(80)
.build())
.build())
.build()).join().expectSuccess("can't alter the topic");

TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue();
TopicDescription description = client.describeTopic(TEST_TOPIC1).join().getValue();

PartitioningSettings actualPartitioningSettings = description.getPartitioningSettings();
PartitioningSettings expectedPartitioningSettings = PartitioningSettings.newBuilder()
Expand All @@ -267,10 +268,34 @@ public void step07_alterTopicWithAutoPartitioning() {
.setUpUtilizationPercent(80)
.setDownUtilizationPercent(20)
.build())
.setMinActivePartitions(1)
.setMaxActivePartitions(1)
.setMinActivePartitions(5)
.setMaxActivePartitions(10)
.build();

Assert.assertEquals(expectedPartitioningSettings, actualPartitioningSettings);
}

@Test
public void step08_createTopicWithAutoPartitioning() {
PartitioningSettings expectedPartitioningSettings = PartitioningSettings.newBuilder()
.setMaxActivePartitions(8)
.setMinActivePartitions(4)
.setAutoPartitioningStrategy(AutoPartitioningStrategy.SCALE_UP)
.setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder()
.setDownUtilizationPercent(5)
.setUpUtilizationPercent(75)
.setStabilizationWindow(Duration.ofMinutes(2))
.build())
.build();

CompletableFuture<Status> secondaryTopicCreated = client.createTopic(TEST_TOPIC2, CreateTopicSettings.newBuilder()
.setPartitioningSettings(expectedPartitioningSettings)
.build());

secondaryTopicCreated.join().expectSuccess("can't create the topic");

TopicDescription description = client.describeTopic(TEST_TOPIC2).join().getValue();

Assert.assertEquals(expectedPartitioningSettings, description.getPartitioningSettings());
}
}