Skip to content

Commit 00a8580

Browse files
authored
ByteArrayPubSubSerDe - A better default for SerDe (#101)
1 parent 0b8b575 commit 00a8580

File tree

8 files changed

+141
-12
lines changed

8 files changed

+141
-12
lines changed

src/main/java/com/yahoo/bullet/common/BulletConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public class BulletConfig extends Config {
143143

144144
public static final String DEFAULT_PUBSUB_CONTEXT_NAME = Context.QUERY_PROCESSING.name();
145145
public static final String DEFAULT_PUBSUB_CLASS_NAME = "com.yahoo.bullet.pubsub.MockPubSub";
146-
public static final String DEFAULT_PUBSUB_MESSAGE_SERDE_CLASS_NAME = "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe";
146+
public static final String DEFAULT_PUBSUB_MESSAGE_SERDE_CLASS_NAME = "com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe";
147147

148148
public static final String DEFAULT_RECORD_PROVIDER_CLASS_NAME = "com.yahoo.bullet.record.avro.TypedAvroBulletRecordProvider";
149149

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2021 Yahoo Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.pubsub;
7+
8+
import com.yahoo.bullet.common.BulletConfig;
9+
import com.yahoo.bullet.common.SerializerDeserializer;
10+
import com.yahoo.bullet.query.Query;
11+
12+
import java.io.Serializable;
13+
14+
/**
15+
* This SerDe is used to convert a provided {@link Query} into its serialized bytes in the {@link PubSubMessage}. When
16+
* invoking the {@link PubSubMessage#getContentAsByteArray()} or {@link PubSubMessage#getContentAsQuery()}, the payload
17+
* is lazily converted (and stored) as the appropriate type. This means that {@link PubSubMessage#getContent()} will
18+
* return a byte[] or {@link Query} depending on what was called before it. This can be used to send a {@link Query}
19+
* to the backend and if it is passed between multiple workers (i.e. serialized and deserialized multiple times), it
20+
* will not needlessly convert the {@link Query} object multiple times.
21+
*
22+
* This behaves like the {@link IdentityPubSubMessageSerDe} for all other operations.
23+
*/
24+
public class ByteArrayPubSubMessageSerDe extends IdentityPubSubMessageSerDe {
25+
private static final long serialVersionUID = -7648403271773714704L;
26+
27+
/**
28+
* A {@link PubSubMessage} that is sticky for converting the content between byte[] and {@link Query}.
29+
*/
30+
private static class LazyPubSubMessage extends PubSubMessage {
31+
private static final long serialVersionUID = -6516915913438279870L;
32+
33+
private LazyPubSubMessage(String id, byte[] content, Metadata metadata) {
34+
super(id, content, metadata);
35+
}
36+
37+
@Override
38+
public byte[] getContentAsByteArray() {
39+
if (content instanceof Query) {
40+
content = SerializerDeserializer.toBytes((Serializable) content);
41+
}
42+
return (byte[]) content;
43+
}
44+
45+
@Override
46+
public Query getContentAsQuery() {
47+
if (content instanceof byte[]) {
48+
content = SerializerDeserializer.fromBytes((byte[]) content);
49+
}
50+
return (Query) content;
51+
}
52+
}
53+
54+
/**
55+
* Constructor.
56+
*
57+
* @param config The {@link BulletConfig} to configure this class.
58+
*/
59+
public ByteArrayPubSubMessageSerDe(BulletConfig config) {
60+
super(config);
61+
}
62+
63+
@Override
64+
public PubSubMessage toMessage(String id, Query query, String queryString) {
65+
return toMessage(new LazyPubSubMessage(id, SerializerDeserializer.toBytes(query), new Metadata(null, queryString)));
66+
}
67+
}

src/main/java/com/yahoo/bullet/pubsub/Metadata.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ public enum Signal {
2222

2323
private static final long serialVersionUID = 7478596915692253699L;
2424
@Getter @Setter
25-
private Signal signal;
25+
protected Signal signal;
2626
// Serializable enforced through the constructor, getter, and setter. Is Object so GSON can reify an instance.
27-
private Object content;
27+
protected Object content;
2828
@Getter @Setter
29-
private long created;
29+
protected long created;
3030

3131
/**
3232
* Default constructor that creates an empty instance of metadata.

src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ public class PubSubMessage implements Serializable, JSONFormatter {
2828
public static final Charset CHARSET = StandardCharsets.UTF_8;
2929
private static final long serialVersionUID = 5096747716667851530L;
3030

31-
private String id;
31+
protected String id;
3232
// Serializable enforced through the constructors, getter and setter. Is Object so GSON can reify an instance.
33-
private Object content;
33+
protected Object content;
3434
@Setter
35-
private Metadata metadata;
35+
protected Metadata metadata;
3636

3737
/**
3838
* Constructor for a message having no information. Used internally. Not recommended for use.

src/main/resources/bullet_defaults.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,4 @@ bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.rest.RESTPubSub"
250250
# The current context. This can be QUERY_PROCESSING or QUERY_SUBMISSION. The PubSub implementation should use this to generate appropriate Publishers and Subscribers.
251251
bullet.pubsub.context.name: "QUERY_PROCESSING"
252252
# The class to use for converting and reading PubSubMessage queries sent to the backend.
253-
bullet.pubsub.message.serde.class.name: "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe"
253+
bullet.pubsub.message.serde.class.name: "com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe"

src/test/java/com/yahoo/bullet/common/BulletConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public void testPropertiesWithPrefix() {
178178
BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml");
179179
String prefix = "bullet.pubsub";
180180
String pubSubClassValue = "com.yahoo.bullet.pubsub.MockPubSub";
181-
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe";
181+
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe";
182182

183183
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size();
184184
Assert.assertEquals(configSize, 5);
@@ -195,7 +195,7 @@ public void testPropertiesStripPrefix() {
195195
String pubsubClassKey = "class.name";
196196
String pubSubClassValue = "com.yahoo.bullet.pubsub.MockPubSub";
197197
String pubsubMessageSerDeClassKey = "message.serde.class.name";
198-
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe";
198+
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe";
199199

200200
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size();
201201
Assert.assertEquals(configSize, 5);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2021 Yahoo Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.pubsub;
7+
8+
import com.yahoo.bullet.common.SerializerDeserializer;
9+
import com.yahoo.bullet.query.Projection;
10+
import com.yahoo.bullet.query.Query;
11+
import com.yahoo.bullet.query.Window;
12+
import com.yahoo.bullet.query.aggregations.AggregationType;
13+
import com.yahoo.bullet.query.aggregations.Raw;
14+
import org.testng.Assert;
15+
import org.testng.annotations.Test;
16+
17+
public class ByteArrayPubSubMessageSerDeTest {
18+
@Test
19+
public void testConvertingQuery() {
20+
ByteArrayPubSubMessageSerDe serDe = new ByteArrayPubSubMessageSerDe(null);
21+
22+
Query query = new Query(new Projection(), null, new Raw(1), null, new Window(), 1L);
23+
PubSubMessage actual = serDe.toMessage("id", query, "foo");
24+
Assert.assertEquals(actual.getId(), "id");
25+
Assert.assertEquals(actual.getContent(), SerializerDeserializer.toBytes(query));
26+
Assert.assertEquals(actual.getMetadata().getContent(), "foo");
27+
}
28+
29+
@Test
30+
public void testDoNothingPubSubMessage() {
31+
ByteArrayPubSubMessageSerDe serDe = new ByteArrayPubSubMessageSerDe(null);
32+
PubSubMessage expected = new PubSubMessage("foo", new byte[0], new Metadata(Metadata.Signal.KILL, null));
33+
Assert.assertSame(serDe.toMessage(expected), expected);
34+
Assert.assertSame(serDe.fromMessage(expected), expected);
35+
}
36+
37+
@Test
38+
public void testLazyMessage() {
39+
ByteArrayPubSubMessageSerDe serDe = new ByteArrayPubSubMessageSerDe(null);
40+
Query query = new Query(new Projection(), null, new Raw(1), null, new Window(), 1L);
41+
PubSubMessage converted = serDe.toMessage("id", query, "foo");
42+
PubSubMessage reverted = serDe.fromMessage(converted);
43+
44+
Assert.assertSame(reverted, converted);
45+
46+
// Starts off as byte[]
47+
Assert.assertEquals(reverted.getContent(), SerializerDeserializer.toBytes(query));
48+
49+
// Payload is now made a Query
50+
Query revertedQuery = reverted.getContentAsQuery();
51+
Assert.assertEquals(revertedQuery.getProjection().getType(), Projection.Type.PASS_THROUGH);
52+
Assert.assertEquals(revertedQuery.getAggregation().getType(), AggregationType.RAW);
53+
Assert.assertEquals((long) revertedQuery.getAggregation().getSize(), 1L);
54+
Assert.assertEquals((long) revertedQuery.getDuration(), 1L);
55+
Assert.assertSame(reverted.getContent(), revertedQuery);
56+
57+
// Payload is now made a byte[]
58+
byte[] revertedByteArray = reverted.getContentAsByteArray();
59+
Assert.assertEquals(revertedByteArray, SerializerDeserializer.toBytes(query));
60+
Assert.assertSame(reverted.getContent(), revertedByteArray);
61+
}
62+
}

src/test/java/com/yahoo/bullet/pubsub/rest/RESTPubSubConfigTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void testPropertiesWithPrefix() {
123123
RESTPubSubConfig config = new RESTPubSubConfig("src/test/resources/test_config.yaml");
124124
String prefix = "bullet.pubsub";
125125
String pubSubClassValue = "com.yahoo.bullet.pubsub.MockPubSub";
126-
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe";
126+
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe";
127127

128128
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size();
129129
Assert.assertEquals(configSize, 10);
@@ -140,7 +140,7 @@ public void testPropertiesStripPrefix() {
140140
String pubsubClassKey = "class.name";
141141
String pubSubClassValue = "com.yahoo.bullet.pubsub.MockPubSub";
142142
String pubsubMessageSerDeClassKey = "message.serde.class.name";
143-
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe";
143+
String pubSubMessageSerDeClassValue = "com.yahoo.bullet.pubsub.ByteArrayPubSubMessageSerDe";
144144

145145
int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size();
146146
Assert.assertEquals(configSize, 10);

0 commit comments

Comments
 (0)