Skip to content

Commit 959538e

Browse files
authored
Merge pull request #334 from ydb-platform/no-finalizer-encoder
Use NoFinalizer Zstd IO streams for topic encoder
2 parents 5e1392d + 84e9845 commit 959538e

File tree

2 files changed

+44
-39
lines changed

2 files changed

+44
-39
lines changed

topic/src/main/java/tech/ydb/topic/utils/Encoder.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import java.util.zip.GZIPInputStream;
99
import java.util.zip.GZIPOutputStream;
1010

11-
import com.github.luben.zstd.ZstdInputStream;
12-
import com.github.luben.zstd.ZstdOutputStream;
11+
import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
12+
import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
1313
import org.anarres.lzo.LzoAlgorithm;
1414
import org.anarres.lzo.LzoCompressor;
1515
import org.anarres.lzo.LzoLibrary;
@@ -25,32 +25,13 @@ public class Encoder {
2525

2626
private Encoder() { }
2727

28-
public static byte[] encode(Codec codec, byte[] input) {
28+
public static byte[] encode(Codec codec, byte[] input) throws IOException {
2929
if (codec == Codec.RAW) {
3030
return input;
3131
}
3232
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
33-
OutputStream os;
34-
try {
35-
switch (codec) {
36-
case GZIP:
37-
os = new GZIPOutputStream(byteArrayOutputStream);
38-
break;
39-
case ZSTD:
40-
os = new ZstdOutputStream(byteArrayOutputStream);
41-
break;
42-
case LZOP:
43-
LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null);
44-
os = new LzoOutputStream(byteArrayOutputStream, lzoCompressor);
45-
break;
46-
case CUSTOM:
47-
default:
48-
throw new RuntimeException("Unsupported codec: " + codec);
49-
}
33+
try (OutputStream os = makeOutputStream(codec, byteArrayOutputStream)) {
5034
os.write(input);
51-
os.close();
52-
} catch (IOException exception) {
53-
throw new RuntimeException(exception);
5435
}
5536
return byteArrayOutputStream.toByteArray();
5637
}
@@ -60,30 +41,49 @@ public static byte[] decode(Codec codec, byte[] input) throws IOException {
6041
return input;
6142
}
6243

63-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
64-
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input);
65-
InputStream is;
44+
try (
45+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
46+
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(input);
47+
InputStream is = makeInputStream(codec, byteArrayInputStream)
48+
) {
49+
byte[] buffer = new byte[1024];
50+
int length;
51+
while ((length = is.read(buffer)) != -1) {
52+
byteArrayOutputStream.write(buffer, 0, length);
53+
}
54+
return byteArrayOutputStream.toByteArray();
55+
}
56+
}
57+
58+
private static OutputStream makeOutputStream(Codec codec,
59+
ByteArrayOutputStream byteArrayOutputStream) throws IOException {
6660
switch (codec) {
6761
case GZIP:
68-
is = new GZIPInputStream(byteArrayInputStream);
69-
break;
62+
return new GZIPOutputStream(byteArrayOutputStream);
7063
case ZSTD:
71-
is = new ZstdInputStream(byteArrayInputStream);
72-
break;
64+
return new ZstdOutputStreamNoFinalizer(byteArrayOutputStream);
7365
case LZOP:
74-
is = new LzopInputStream(byteArrayInputStream);
75-
break;
66+
LzoCompressor lzoCompressor = LzoLibrary.getInstance().newCompressor(LzoAlgorithm.LZO1X, null);
67+
return new LzoOutputStream(byteArrayOutputStream, lzoCompressor);
7668
case CUSTOM:
7769
default:
7870
throw new RuntimeException("Unsupported codec: " + codec);
7971
}
80-
byte[] buffer = new byte[1024];
81-
int length;
82-
while ((length = is.read(buffer)) != -1) {
83-
byteArrayOutputStream.write(buffer, 0, length);
72+
}
73+
74+
private static InputStream makeInputStream(Codec codec,
75+
ByteArrayInputStream byteArrayInputStream) throws IOException {
76+
switch (codec) {
77+
case GZIP:
78+
return new GZIPInputStream(byteArrayInputStream);
79+
case ZSTD:
80+
return new ZstdInputStreamNoFinalizer(byteArrayInputStream);
81+
case LZOP:
82+
return new LzopInputStream(byteArrayInputStream);
83+
case CUSTOM:
84+
default:
85+
throw new RuntimeException("Unsupported codec: " + codec);
8486
}
85-
is.close();
86-
return byteArrayOutputStream.toByteArray();
8787
}
8888

8989
}

topic/src/main/java/tech/ydb/topic/write/impl/WriterImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package tech.ydb.topic.write.impl;
22

3+
import java.io.IOException;
34
import java.util.Deque;
45
import java.util.LinkedList;
56
import java.util.List;
@@ -172,7 +173,11 @@ private void encode(EnqueuedMessage message) {
172173
if (settings.getCodec() == Codec.RAW) {
173174
return;
174175
}
175-
message.getMessage().setData(Encoder.encode(settings.getCodec(), message.getMessage().getData()));
176+
try {
177+
message.getMessage().setData(Encoder.encode(settings.getCodec(), message.getMessage().getData()));
178+
} catch (IOException exception) {
179+
throw new RuntimeException("Couldn't encode a message", exception);
180+
}
176181
message.setCompressedSizeBytes(message.getMessage().getData().length);
177182
message.setCompressed(true);
178183
logger.trace("[{}] Successfully finished encoding message", id);

0 commit comments

Comments
 (0)