From 87d01fa02bf44bc6e830d4cd288dcc8e40e6f90c Mon Sep 17 00:00:00 2001 From: Daniil Zulin Date: Fri, 27 Jun 2025 15:24:56 +0700 Subject: [PATCH 1/3] Bugfix: set max_active_partitions on topic creation --- .../main/java/tech/ydb/topic/impl/TopicClientImpl.java | 1 + .../tech/ydb/topic/impl/YdbTopicsIntegrationTest.java | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java index e3d34664..0bd4080e 100644 --- a/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java +++ b/topic/src/main/java/tech/ydb/topic/impl/TopicClientImpl.java @@ -107,6 +107,7 @@ public CompletableFuture createTopic(String path, CreateTopicSettings se if (partitioningSettings != null) { requestBuilder.setPartitioningSettings(YdbTopic.PartitioningSettings.newBuilder() .setMinActivePartitions(partitioningSettings.getMinActivePartitions()) + .setMaxActivePartitions(partitioningSettings.getMaxActivePartitions()) .setPartitionCountLimit(partitioningSettings.getPartitionCountLimit()) .setAutoPartitioningSettings(YdbTopic.AutoPartitioningSettings.newBuilder() .setStrategy(toProto(partitioningSettings.getAutoPartitioningStrategy())))); diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java index 543b04d3..9f06123b 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java @@ -13,7 +13,6 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.FixMethodOrder; -import org.junit.Ignore; import org.junit.Test; import org.junit.runners.MethodSorters; import org.slf4j.Logger; @@ -244,12 +243,13 @@ public void onMessages(DataReceivedEvent dre) { } } - @Ignore("remove ignore once :latest YDB container tag moves onto version 25.1") @Test public void step07_alterTopicWithAutoPartitioning() { client.alterTopic(TEST_TOPIC, AlterTopicSettings.newBuilder() .setAlterPartitioningSettings(AlterPartitioningSettings.newBuilder() .setAutoPartitioningStrategy(AutoPartitioningStrategy.SCALE_UP) + .setMaxActivePartitions(10) + .setMinActivePartitions(5) .setWriteStrategySettings(AlterAutoPartitioningWriteStrategySettings.newBuilder() .setStabilizationWindow(Duration.ofMinutes(1)) .setUpUtilizationPercent(80) @@ -267,8 +267,8 @@ public void step07_alterTopicWithAutoPartitioning() { .setUpUtilizationPercent(80) .setDownUtilizationPercent(20) .build()) - .setMinActivePartitions(1) - .setMaxActivePartitions(1) + .setMinActivePartitions(5) + .setMaxActivePartitions(10) .build(); Assert.assertEquals(expectedPartitioningSettings, actualPartitioningSettings); From 03acd842db120f392564c8c3add4ceb476745109 Mon Sep 17 00:00:00 2001 From: Daniil Zulin Date: Fri, 27 Jun 2025 15:32:32 +0700 Subject: [PATCH 2/3] Add integration test for topic creation --- .../topic/impl/YdbTopicsIntegrationTest.java | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java index 9f06123b..7f39d8e5 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java @@ -53,7 +53,8 @@ public class YdbTopicsIntegrationTest { @ClassRule public final static GrpcTransportRule ydbTransport = new GrpcTransportRule(); - private final static String TEST_TOPIC = "integration_test_topic"; + private final static String TEST_TOPIC1 = "integration_test_topic"; + private final static String TEST_TOPIC2 = "integration_test_other_topic"; private final static String TEST_CONSUMER1 = "consumer"; private final static String TEST_CONSUMER2 = "other_consumer"; @@ -69,10 +70,10 @@ public class YdbTopicsIntegrationTest { @BeforeClass public static void initTopic() { - logger.info("Create test topic {} ...", TEST_TOPIC); + logger.info("Create test topic {} ...", TEST_TOPIC1); client = TopicClient.newClient(ydbTransport).build(); - client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder() + client.createTopic(TEST_TOPIC1, CreateTopicSettings.newBuilder() .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build()) .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER2).build()) .build() @@ -81,8 +82,8 @@ public static void initTopic() { @AfterClass public static void dropTopic() { - logger.info("Drop test topic {} ...", TEST_TOPIC); - Status dropStatus = client.dropTopic(TEST_TOPIC).join(); + logger.info("Drop test topic {} ...", TEST_TOPIC1); + Status dropStatus = client.dropTopic(TEST_TOPIC1).join(); client.close(); dropStatus.expectSuccess("can't drop test topic"); } @@ -90,7 +91,7 @@ public static void dropTopic() { @Test public void step01_writeWithoutDeduplication() throws InterruptedException, ExecutionException, TimeoutException { WriterSettings settings = WriterSettings.newBuilder() - .setTopicPath(TEST_TOPIC) + .setTopicPath(TEST_TOPIC1) .build(); SyncWriter writer = client.createSyncWriter(settings); writer.init(); @@ -110,7 +111,7 @@ public void step01_writeWithoutDeduplication() throws InterruptedException, Exec @Test public void step02_readHalfWithoutCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -128,7 +129,7 @@ public void step02_readHalfWithoutCommit() throws InterruptedException { @Test public void step03_readHalfWithCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -147,7 +148,7 @@ public void step03_readHalfWithCommit() throws InterruptedException { @Test public void step03_readNextHalfWithoutCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -169,7 +170,7 @@ public void step03_readNextHalfWithoutCommit() throws InterruptedException { @Test public void step04_readNextHalfWithCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -193,7 +194,7 @@ public void step04_readNextHalfWithCommit() throws InterruptedException { @Test public void step05_describeTopic() { - TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue(); + TopicDescription description = client.describeTopic(TEST_TOPIC1).join().getValue(); Assert.assertNull(description.getTopicStats()); List consumers = description.getConsumers(); @@ -206,7 +207,7 @@ public void step05_describeTopic() { @Test public void step06_readAllByAsyncReader() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) .setConsumerName(TEST_CONSUMER2) .build(); @@ -245,7 +246,7 @@ public void onMessages(DataReceivedEvent dre) { @Test public void step07_alterTopicWithAutoPartitioning() { - client.alterTopic(TEST_TOPIC, AlterTopicSettings.newBuilder() + client.alterTopic(TEST_TOPIC1, AlterTopicSettings.newBuilder() .setAlterPartitioningSettings(AlterPartitioningSettings.newBuilder() .setAutoPartitioningStrategy(AutoPartitioningStrategy.SCALE_UP) .setMaxActivePartitions(10) @@ -257,7 +258,7 @@ public void step07_alterTopicWithAutoPartitioning() { .build()) .build()).join().expectSuccess("can't alter the topic"); - TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue(); + TopicDescription description = client.describeTopic(TEST_TOPIC1).join().getValue(); PartitioningSettings actualPartitioningSettings = description.getPartitioningSettings(); PartitioningSettings expectedPartitioningSettings = PartitioningSettings.newBuilder() @@ -273,4 +274,28 @@ public void step07_alterTopicWithAutoPartitioning() { Assert.assertEquals(expectedPartitioningSettings, actualPartitioningSettings); } + + @Test + public void step08_createTopicWithAutoPartitioning() { + PartitioningSettings expectedPartitioningSettings = PartitioningSettings.newBuilder() + .setMaxActivePartitions(8) + .setMinActivePartitions(4) + .setAutoPartitioningStrategy(AutoPartitioningStrategy.SCALE_UP) + .setWriteStrategySettings(AutoPartitioningWriteStrategySettings.newBuilder() + .setDownUtilizationPercent(5) + .setUpUtilizationPercent(75) + .setStabilizationWindow(Duration.ofMinutes(2)) + .build()) + .build(); + + CompletableFuture secondaryTopicCreated = client.createTopic(TEST_TOPIC2, CreateTopicSettings.newBuilder() + .setPartitioningSettings(expectedPartitioningSettings) + .build()); + + secondaryTopicCreated.join().expectSuccess("can't create the topic"); + + TopicDescription description = client.describeTopic(TEST_TOPIC2).join().getValue(); + + Assert.assertEquals(expectedPartitioningSettings, description.getPartitioningSettings()); + } } From 6930733ec7bd0b98dc25352ff3c90c8bd1ea6dd6 Mon Sep 17 00:00:00 2001 From: Daniil Zulin Date: Fri, 27 Jun 2025 18:27:58 +0700 Subject: [PATCH 3/3] Revert rename --- .../topic/impl/YdbTopicsIntegrationTest.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java index 7f39d8e5..cc20a4fa 100644 --- a/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java +++ b/topic/src/test/java/tech/ydb/topic/impl/YdbTopicsIntegrationTest.java @@ -53,8 +53,9 @@ public class YdbTopicsIntegrationTest { @ClassRule public final static GrpcTransportRule ydbTransport = new GrpcTransportRule(); - private final static String TEST_TOPIC1 = "integration_test_topic"; - private final static String TEST_TOPIC2 = "integration_test_other_topic"; + private final static String TEST_TOPIC = "integration_test_topic"; + private final static String TEST_OTHER_TOPIC = "integration_test_other_topic"; + private final static String TEST_CONSUMER1 = "consumer"; private final static String TEST_CONSUMER2 = "other_consumer"; @@ -70,10 +71,10 @@ public class YdbTopicsIntegrationTest { @BeforeClass public static void initTopic() { - logger.info("Create test topic {} ...", TEST_TOPIC1); + logger.info("Create test topic {} ...", TEST_TOPIC); client = TopicClient.newClient(ydbTransport).build(); - client.createTopic(TEST_TOPIC1, CreateTopicSettings.newBuilder() + client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder() .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build()) .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER2).build()) .build() @@ -82,8 +83,8 @@ public static void initTopic() { @AfterClass public static void dropTopic() { - logger.info("Drop test topic {} ...", TEST_TOPIC1); - Status dropStatus = client.dropTopic(TEST_TOPIC1).join(); + logger.info("Drop test topic {} ...", TEST_TOPIC); + Status dropStatus = client.dropTopic(TEST_TOPIC).join(); client.close(); dropStatus.expectSuccess("can't drop test topic"); } @@ -91,7 +92,7 @@ public static void dropTopic() { @Test public void step01_writeWithoutDeduplication() throws InterruptedException, ExecutionException, TimeoutException { WriterSettings settings = WriterSettings.newBuilder() - .setTopicPath(TEST_TOPIC1) + .setTopicPath(TEST_TOPIC) .build(); SyncWriter writer = client.createSyncWriter(settings); writer.init(); @@ -111,7 +112,7 @@ public void step01_writeWithoutDeduplication() throws InterruptedException, Exec @Test public void step02_readHalfWithoutCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -129,7 +130,7 @@ public void step02_readHalfWithoutCommit() throws InterruptedException { @Test public void step03_readHalfWithCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -148,7 +149,7 @@ public void step03_readHalfWithCommit() throws InterruptedException { @Test public void step03_readNextHalfWithoutCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -170,7 +171,7 @@ public void step03_readNextHalfWithoutCommit() throws InterruptedException { @Test public void step04_readNextHalfWithCommit() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) .setConsumerName(TEST_CONSUMER1) .build(); @@ -194,7 +195,7 @@ public void step04_readNextHalfWithCommit() throws InterruptedException { @Test public void step05_describeTopic() { - TopicDescription description = client.describeTopic(TEST_TOPIC1).join().getValue(); + TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue(); Assert.assertNull(description.getTopicStats()); List consumers = description.getConsumers(); @@ -207,7 +208,7 @@ public void step05_describeTopic() { @Test public void step06_readAllByAsyncReader() throws InterruptedException { ReaderSettings settings = ReaderSettings.newBuilder() - .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC1).build()) + .addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build()) .setConsumerName(TEST_CONSUMER2) .build(); @@ -246,7 +247,7 @@ public void onMessages(DataReceivedEvent dre) { @Test public void step07_alterTopicWithAutoPartitioning() { - client.alterTopic(TEST_TOPIC1, AlterTopicSettings.newBuilder() + client.alterTopic(TEST_TOPIC, AlterTopicSettings.newBuilder() .setAlterPartitioningSettings(AlterPartitioningSettings.newBuilder() .setAutoPartitioningStrategy(AutoPartitioningStrategy.SCALE_UP) .setMaxActivePartitions(10) @@ -258,7 +259,7 @@ public void step07_alterTopicWithAutoPartitioning() { .build()) .build()).join().expectSuccess("can't alter the topic"); - TopicDescription description = client.describeTopic(TEST_TOPIC1).join().getValue(); + TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue(); PartitioningSettings actualPartitioningSettings = description.getPartitioningSettings(); PartitioningSettings expectedPartitioningSettings = PartitioningSettings.newBuilder() @@ -288,13 +289,13 @@ public void step08_createTopicWithAutoPartitioning() { .build()) .build(); - CompletableFuture secondaryTopicCreated = client.createTopic(TEST_TOPIC2, CreateTopicSettings.newBuilder() + CompletableFuture secondaryTopicCreated = client.createTopic(TEST_OTHER_TOPIC, CreateTopicSettings.newBuilder() .setPartitioningSettings(expectedPartitioningSettings) .build()); secondaryTopicCreated.join().expectSuccess("can't create the topic"); - TopicDescription description = client.describeTopic(TEST_TOPIC2).join().getValue(); + TopicDescription description = client.describeTopic(TEST_OTHER_TOPIC).join().getValue(); Assert.assertEquals(expectedPartitioningSettings, description.getPartitioningSettings()); }