Skip to content

Commit 0f3eb98

Browse files
committed
More fixes
1 parent ba48874 commit 0f3eb98

File tree

6 files changed

+35
-17
lines changed

6 files changed

+35
-17
lines changed

plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaPageSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,16 @@ public CompletableFuture<Collection<Slice>> finish()
146146
throw new UncheckedIOException("Failed to close row encoders", e);
147147
}
148148

149+
if (producerCallback.getErrorCount() > 0) {
150+
throw new TrinoException(KAFKA_PRODUCER_ERROR, format("%d producer record(s) failed to send", producerCallback.getErrorCount()));
151+
}
152+
149153
checkArgument(
150154
producerCallback.getWrittenBytes() == expectedWrittenBytes,
151155
"Actual written bytes: '%s' not equal to expected written bytes: '%s'",
152156
producerCallback.getWrittenBytes(),
153157
expectedWrittenBytes);
154158

155-
if (producerCallback.getErrorCount() > 0) {
156-
throw new TrinoException(KAFKA_PRODUCER_ERROR, format("%d producer record(s) failed to send", producerCallback.getErrorCount()));
157-
}
158-
159159
return completedFuture(ImmutableList.of());
160160
}
161161

plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ClassLoaderSafeSchemaRegistryClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,4 +624,22 @@ public void close()
624624
delegate.close();
625625
}
626626
}
627+
628+
@Override
629+
public ParsedSchema getSchemaByGuid(String guid, String format)
630+
throws IOException, RestClientException
631+
{
632+
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
633+
return delegate.getSchemaByGuid(guid, format);
634+
}
635+
}
636+
637+
@Override
638+
public RegisterSchemaResponse getIdWithResponse(String subject, ParsedSchema schema, boolean normalize)
639+
throws IOException, RestClientException
640+
{
641+
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
642+
return delegate.getIdWithResponse(subject, schema, normalize);
643+
}
644+
}
627645
}

plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaIntegrationPushDown.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ public void testPartitionPushDown()
100100
public void testOffsetPushDown()
101101
{
102102
createMessages(topicNameOffset);
103-
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", topicNameOffset), 18);
104-
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", topicNameOffset), 14);
105-
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", topicNameOffset), 2);
103+
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset between 2 and 10", topicNameOffset), 9);
104+
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset > 2 and _partition_offset < 10", topicNameOffset), 6);
105+
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _partition_offset = 3", topicNameOffset), 1);
106106
}
107107

108108
@Test
@@ -137,7 +137,7 @@ public void testTimestampLogAppendModePushDown()
137137
{
138138
RecordMessage recordMessage = createTimestampTestMessages(topicNameLogAppend);
139139
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp < timestamp '%s'", topicNameLogAppend, recordMessage.getEndTime()), 4);
140-
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", topicNameLogAppend, recordMessage.getEndTime()), 4);
140+
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp <= timestamp '%s'", topicNameLogAppend, recordMessage.getEndTime()), 1000);
141141
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp > timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime()), 997);
142142
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp >= timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime()), 998);
143143
assertProcessedInputPositions(format("SELECT count(*) FROM default.%s WHERE _timestamp between timestamp '%s' and timestamp '%s'", topicNameLogAppend, recordMessage.getStartTime(), recordMessage.getEndTime()), 2);

plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/TestKafkaLatestConnectorSmokeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class TestKafkaLatestConnectorSmokeTest
2929
protected QueryRunner createQueryRunner()
3030
throws Exception
3131
{
32-
TestingKafka testingKafka = closeAfterClass(TestingKafka.create("7.1.1"));
32+
TestingKafka testingKafka = closeAfterClass(TestingKafka.create("8.0.0"));
3333
testingKafka.start();
3434
return KafkaQueryRunner.builder(testingKafka)
3535
.setTables(REQUIRED_TPCH_TABLES)

testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/common/Kafka.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class Kafka
3636
implements EnvironmentExtender
3737
{
38-
private static final String CONFLUENT_VERSION = "7.9.0";
38+
private static final String CONFLUENT_VERSION = "8.0.0";
3939
private static final int SCHEMA_REGISTRY_PORT = 8081;
4040
static final String KAFKA = "kafka";
4141
static final String SCHEMA_REGISTRY = "schema-registry";

testing/trino-testing-kafka/src/main/java/io/trino/testing/kafka/TestingKafka.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import org.apache.kafka.clients.producer.RecordMetadata;
2626
import org.apache.kafka.common.serialization.LongSerializer;
2727
import org.testcontainers.containers.GenericContainer;
28-
import org.testcontainers.containers.KafkaContainer;
2928
import org.testcontainers.containers.Network;
29+
import org.testcontainers.kafka.ConfluentKafkaContainer;
3030
import org.testcontainers.utility.DockerImageName;
3131
import org.testcontainers.utility.MountableFile;
3232

@@ -49,22 +49,21 @@
4949
import static com.google.common.base.Preconditions.checkState;
5050
import static java.lang.String.format;
5151
import static java.time.temporal.ChronoUnit.MILLIS;
52-
import static org.testcontainers.containers.KafkaContainer.KAFKA_PORT;
5352
import static org.testcontainers.utility.MountableFile.forClasspathResource;
5453

5554
public final class TestingKafka
5655
implements Closeable
5756
{
5857
private static final Logger log = Logger.get(TestingKafka.class);
5958

60-
private static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "7.9.0";
59+
private static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "8.0.0";
6160
private static final int SCHEMA_REGISTRY_PORT = 8081;
6261

6362
private static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
6463
private static final DockerImageName SCHEMA_REGISTRY_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-schema-registry");
6564

6665
private final Network network;
67-
private final KafkaContainer kafka;
66+
private final ConfluentKafkaContainer kafka;
6867
private final GenericContainer<?> schemaRegistry;
6968
private final boolean withSchemaRegistry;
7069
private final Closer closer = Closer.create();
@@ -95,18 +94,19 @@ private TestingKafka(String confluentPlatformVersion, boolean withSchemaRegistry
9594
// Modify the template directly instead.
9695
MountableFile kafkaLogTemplate = forClasspathResource("log4j-kafka.properties.template");
9796
MountableFile schemaRegistryLogTemplate = forClasspathResource("log4j-schema-registry.properties.template");
98-
kafka = new KafkaContainer(KAFKA_IMAGE_NAME.withTag(confluentPlatformVersion))
97+
kafka = new ConfluentKafkaContainer(KAFKA_IMAGE_NAME.withTag(confluentPlatformVersion))
9998
.withStartupAttempts(3)
10099
.withNetwork(network)
101100
.withNetworkAliases("kafka")
101+
.withListener("kafka:9095")
102102
.withCopyFileToContainer(
103103
kafkaLogTemplate,
104104
"/etc/confluent/docker/log4j.properties.template");
105105
schemaRegistry = new GenericContainer<>(SCHEMA_REGISTRY_IMAGE_NAME.withTag(confluentPlatformVersion))
106106
.withStartupAttempts(3)
107107
.withNetwork(network)
108108
.withNetworkAliases("schema-registry")
109-
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092")
109+
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "kafka:9095")
110110
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "0.0.0.0")
111111
.withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + SCHEMA_REGISTRY_PORT)
112112
.withEnv("SCHEMA_REGISTRY_HEAP_OPTS", "-Xmx1G")
@@ -233,7 +233,7 @@ private <K, V> Future<RecordMetadata> send(KafkaProducer<K, V> producer, Produce
233233

234234
public String getConnectString()
235235
{
236-
return kafka.getHost() + ":" + kafka.getMappedPort(KAFKA_PORT);
236+
return kafka.getBootstrapServers();
237237
}
238238

239239
private <K, V> KafkaProducer<K, V> createProducer(Map<String, String> extraProperties)

0 commit comments

Comments
 (0)