Skip to content

Commit 48f9287

Browse files
authored
Postquery (#104)
1 parent 06acc78 commit 48f9287

File tree

8 files changed

+146
-21
lines changed

8 files changed

+146
-21
lines changed

src/main/java/com/yahoo/bullet/common/BulletConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public class BulletConfig extends Config {
112112
makeMetadata(ImmutablePair.of(Concept.QUERY_METADATA, "Query"),
113113
ImmutablePair.of(Concept.QUERY_ID, "ID"),
114114
ImmutablePair.of(Concept.QUERY_STRING, "Query String"),
115+
ImmutablePair.of(Concept.SUBQUERY_METADATA, "Subquery"),
115116
ImmutablePair.of(Concept.QUERY_RECEIVE_TIME, "Receive Time"),
116117
ImmutablePair.of(Concept.QUERY_FINISH_TIME, "Finish Time"),
117118
ImmutablePair.of(Concept.SKETCH_METADATA, "Sketch"),

src/main/java/com/yahoo/bullet/query/Query.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,18 @@ public class Query implements Configurable, Serializable {
3232
private final Expression filter;
3333
private final Aggregation aggregation;
3434
private final List<PostAggregation> postAggregations;
35+
private final Query outerQuery;
3536
private Window window;
3637
private Long duration;
3738

3839
private static final BulletException ONLY_RAW_RECORD = new BulletException("Only RAW aggregation type can have window emit type RECORD.",
3940
"Change your aggregation type or your window emit type to TIME.");
4041
private static final BulletException NO_RAW_ALL = new BulletException("RAW aggregation type cannot have window include type ALL.",
41-
"Change your aggregation type or your window include type");
42+
"Change your aggregation type or your window include type.");
43+
private static final BulletException NO_OUTER_QUERY_WINDOW = new BulletException("Outer query cannot have a window.",
44+
"Remove the window.");
45+
private static final BulletException NO_NESTED_OUTER_QUERY = new BulletException("Outer query cannot have an outer query.",
46+
"Remove the nested outer query.");
4247

4348
/**
4449
* Constructor that creates the Bullet query.
@@ -48,19 +53,37 @@ public class Query implements Configurable, Serializable {
4853
* @param filter The filter expression records must pass before projection. Can be null.
4954
* @param aggregation The non-null aggregation that takes projected records.
5055
* @param postAggregations The list of post-aggregations that are executed on records before getting results. Can be null.
56+
* @param outerQuery The outer query that is executed on records before getting results. Can be null.
5157
* @param window The non-null window that decides when and how results are returned.
5258
* @param duration The duration of the query. Can be null.
5359
*/
54-
public Query(TableFunction tableFunction, Projection projection, Expression filter, Aggregation aggregation, List<PostAggregation> postAggregations, Window window, Long duration) {
60+
public Query(TableFunction tableFunction, Projection projection, Expression filter, Aggregation aggregation, List<PostAggregation> postAggregations, Query outerQuery, Window window, Long duration) {
5561
this.tableFunction = tableFunction;
5662
this.projection = Objects.requireNonNull(projection);
5763
this.filter = filter;
5864
this.aggregation = Objects.requireNonNull(aggregation);
5965
this.postAggregations = postAggregations;
66+
this.outerQuery = outerQuery;
6067
this.window = Objects.requireNonNull(window);
6168
this.duration = duration;
6269
// Required since there are window types that are not yet supported.
6370
validateWindow();
71+
validateOuterQuery();
72+
}
73+
74+
/**
75+
* Constructor that creates the Bullet query.
76+
*
77+
* @param tableFunction The table function that is applied to the input record before the rest of the query. Can be null.
78+
* @param projection The non-null projection that decides which fields are selected from a Bullet record before aggregation.
79+
* @param filter The filter expression records must pass before projection. Can be null.
80+
* @param aggregation The non-null aggregation that takes projected records.
81+
* @param postAggregations The list of post-aggregations that are executed on records before getting results. Can be null.
82+
* @param window The non-null window that decides when and how results are returned.
83+
* @param duration The duration of the query. Can be null.
84+
*/
85+
public Query(TableFunction tableFunction, Projection projection, Expression filter, Aggregation aggregation, List<PostAggregation> postAggregations, Window window, Long duration) {
86+
this(tableFunction, projection, filter, aggregation, postAggregations, null, window, duration);
6487
}
6588

6689
/**
@@ -88,6 +111,18 @@ private void validateWindow() {
88111
}
89112
}
90113

114+
private void validateOuterQuery() {
115+
if (outerQuery == null) {
116+
return;
117+
}
118+
if (outerQuery.getWindow().getType() != null) {
119+
throw NO_OUTER_QUERY_WINDOW;
120+
}
121+
if (outerQuery.getOuterQuery() != null) {
122+
throw NO_NESTED_OUTER_QUERY;
123+
}
124+
}
125+
91126
@Override
92127
@SuppressWarnings("unchecked")
93128
public void configure(BulletConfig config) {
@@ -104,11 +139,15 @@ public void configure(BulletConfig config) {
104139

105140
// Null or negative, then default, else min of duration and max.
106141
duration = (duration == null || duration <= 0) ? durationDefault : Math.min(duration, durationMax);
142+
143+
if (outerQuery != null) {
144+
outerQuery.configure(config);
145+
}
107146
}
108147

109148
@Override
110149
public String toString() {
111-
return "{projection: " + projection + ", filter: " + filter + ", aggregation: " + aggregation +
112-
", postAggregations: " + postAggregations + ", window: " + window + ", duration: " + duration + "}";
150+
return "{tableFunction: " + tableFunction + ", projection: " + projection + ", filter: " + filter + ", aggregation: " + aggregation +
151+
", postAggregations: " + postAggregations + ", window: " + window + ", duration: " + duration + ", outerQuery: " + outerQuery + "}";
113152
}
114153
}

src/main/java/com/yahoo/bullet/querying/Querier.java

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
*/
66
package com.yahoo.bullet.querying;
77

8+
import com.yahoo.bullet.pubsub.Metadata;
89
import com.yahoo.bullet.query.expressions.Expression;
910
import com.yahoo.bullet.query.tablefunctions.TableFunction;
1011
import com.yahoo.bullet.querying.aggregations.Strategy;
1112
import com.yahoo.bullet.common.BulletConfig;
1213
import com.yahoo.bullet.common.BulletError;
1314
import com.yahoo.bullet.common.Monoidal;
1415
import com.yahoo.bullet.query.Query;
15-
import com.yahoo.bullet.query.Window;
1616
import com.yahoo.bullet.querying.postaggregations.PostStrategy;
1717
import com.yahoo.bullet.query.postaggregations.PostAggregation;
1818
import com.yahoo.bullet.querying.tablefunctors.TableFunctor;
@@ -412,11 +412,7 @@ public void consume(BulletRecord record) {
412412
if (isDone()) {
413413
return;
414414
}
415-
if (tableFunctor == null) {
416-
consumeRecord(record);
417-
} else {
418-
tableFunctor.apply(record, provider).forEach(this::consumeRecord);
419-
}
415+
consumeRecord(record);
420416
}
421417

422418
/**
@@ -466,6 +462,7 @@ public List<BulletRecord> getRecords() {
466462
Clip result = new Clip();
467463
result.add(window.getRecords());
468464
result = postAggregate(result);
465+
result = outerQuery(result);
469466
return result.getRecords();
470467
} catch (RuntimeException e) {
471468
log.error("Unable to get serialized result for query {}", this);
@@ -503,6 +500,7 @@ public Clip getResult() {
503500
incrementRate();
504501
result = window.getResult();
505502
result = postAggregate(result);
503+
result = outerQuery(result);
506504
result.add(getResultMetadata());
507505
} catch (RuntimeException e) {
508506
log.error("Unable to get serialized data for query {}", this);
@@ -598,10 +596,8 @@ public RateLimitError getRateLimitError() {
598596
* @return A boolean that is true if the query results should be buffered in the Join phase.
599597
*/
600598
public boolean shouldBuffer() {
601-
Window window = runningQuery.getQuery().getWindow();
602-
boolean noWindow = window == null;
603-
// Only buffer if there is no window (including RawStrategy) or if it's a record based window.
604-
return noWindow || !window.isTimeBased();
599+
// Only buffer if the window is not time based (RawStrategy or if it's a record based window).
600+
return !runningQuery.getQuery().getWindow().isTimeBased();
605601
}
606602

607603
/**
@@ -623,6 +619,14 @@ public String toString() {
623619
// ********************************* Private helpers *********************************
624620

625621
private void consumeRecord(BulletRecord record) {
622+
if (tableFunctor == null) {
623+
process(record);
624+
} else {
625+
tableFunctor.apply(record, provider).forEach(this::process);
626+
}
627+
}
628+
629+
private void process(BulletRecord record) {
626630
// Ignore if record doesn't match filters
627631
if (!filter(record)) {
628632
return;
@@ -664,6 +668,23 @@ private Clip postAggregate(Clip clip) {
664668
return clip;
665669
}
666670

671+
private Clip outerQuery(Clip clip) {
672+
if (runningQuery.getQuery().getOuterQuery() == null) {
673+
return clip;
674+
}
675+
Querier querier = new Querier(Mode.ALL, new RunningQuery(runningQuery.getId(), runningQuery.getQuery().getOuterQuery(), new Metadata()), config);
676+
for (BulletRecord record : clip.getRecords()) {
677+
// A bit inefficient since this is only needed for RAW aggregation queries
678+
if (querier.isClosed()) {
679+
break;
680+
}
681+
querier.consumeRecord(record);
682+
}
683+
Clip result = querier.getResult();
684+
result.getMeta().add(getSubMetaKey(), clip.getMeta().asMap());
685+
return result;
686+
}
687+
667688
private Meta getResultMetadata() {
668689
String metaKey = getMetaKey();
669690
if (metaKey == null) {
@@ -701,6 +722,10 @@ private void incrementRate() {
701722
}
702723

703724
private String getMetaKey() {
704-
return metaKeys.getOrDefault(Meta.Concept.QUERY_METADATA.getName(), null);
725+
return metaKeys.getOrDefault(Concept.QUERY_METADATA.getName(), null);
726+
}
727+
728+
private String getSubMetaKey() {
729+
return metaKeys.getOrDefault(Concept.SUBQUERY_METADATA.getName(), null);
705730
}
706731
}

src/main/java/com/yahoo/bullet/querying/RunningQuery.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ public class RunningQuery {
2020
private final long startTime;
2121

2222
/**
23-
* Constructor that takes an id, query, query string, and start time. If the start time is missing, uses the
24-
* current system time.
23+
* Constructor that takes an id, query, and metadata that contains the query string and start time.
2524
*
2625
* @param id The query id.
2726
* @param query The query object.

src/main/java/com/yahoo/bullet/result/Meta.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public enum Concept {
3131
QUERY_ID("Query ID"),
3232
QUERY_OBJECT("Query Object"),
3333
QUERY_STRING("Query String"),
34+
SUBQUERY_METADATA("Subquery Metadata"),
3435

3536
// Sketching metadata
3637
SKETCH_METADATA("Sketch Metadata"),

src/main/resources/bullet_defaults.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ bullet.result.metadata.metrics:
166166
key: "Receive Time"
167167
- name: "Query Finish Time"
168168
key: "Finish Time"
169+
- name: "Subquery Metadata"
170+
key: "Subquery"
169171
- name: "Sketch Metadata"
170172
key: "Sketch"
171173
- name: "Sketch Estimated Result"

src/test/java/com/yahoo/bullet/query/QueryTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ public void testValidateWindowNoRawAll() {
5151
new Query(new Projection(), null, new Raw(null), null, new Window(1, Window.Unit.TIME, Window.Unit.ALL, null), null);
5252
}
5353

54+
@Test(expectedExceptions = BulletException.class, expectedExceptionsMessageRegExp = "Outer query cannot have a window\\.")
55+
public void testValidateOuterQueryNoWindow() {
56+
Query outerQuery = new Query(new Projection(), null, new Raw(null), null, new Window(1, Window.Unit.RECORD), null);
57+
new Query(null, new Projection(), null, new Raw(null), null, outerQuery, new Window(), null);
58+
}
59+
60+
@Test(expectedExceptions = BulletException.class, expectedExceptionsMessageRegExp = "Outer query cannot have an outer query\\.")
61+
public void testValidateOuterQueryNoNestedOuterQuery() {
62+
Query nestedOuterQuery = new Query(new Projection(), null, new Raw(null), null, new Window(), null);
63+
Query outerQuery = new Query(null, new Projection(), null, new Raw(null), null, nestedOuterQuery, new Window(), null);
64+
new Query(null, new Projection(), null, new Raw(null), null, outerQuery, new Window(), null);
65+
}
66+
5467
@Test
5568
public void testDuration() {
5669
BulletConfig config = new BulletConfig();
@@ -151,9 +164,9 @@ public void testToString() {
151164
config.set(BulletConfig.QUERY_DEFAULT_DURATION, 30000L);
152165
Query query = new Query(new Projection(), null, new Raw(null), null, new Window(), null);
153166
query.configure(config.validate());
154-
Assert.assertEquals(query.toString(), "{projection: {fields: null, type: PASS_THROUGH}, filter: null, " +
167+
Assert.assertEquals(query.toString(), "{tableFunction: null, projection: {fields: null, type: PASS_THROUGH}, filter: null, " +
155168
"aggregation: {size: 1, type: RAW}, postAggregations: null, " +
156169
"window: {emitEvery: null, emitType: null, includeType: null, includeFirst: null}, " +
157-
"duration: 30000}");
170+
"duration: 30000, outerQuery: null}");
158171
}
159172
}

src/test/java/com/yahoo/bullet/querying/QuerierTest.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.yahoo.bullet.query.aggregations.AggregationType;
1616
import com.yahoo.bullet.query.aggregations.CountDistinct;
1717
import com.yahoo.bullet.query.aggregations.GroupAll;
18+
import com.yahoo.bullet.query.aggregations.GroupBy;
1819
import com.yahoo.bullet.query.aggregations.Raw;
1920
import com.yahoo.bullet.query.expressions.BinaryExpression;
2021
import com.yahoo.bullet.query.expressions.Expression;
@@ -56,6 +57,8 @@
5657
import java.util.stream.Stream;
5758

5859
import static com.yahoo.bullet.TestHelpers.getListBytes;
60+
import static java.util.Collections.singleton;
61+
import static java.util.Collections.singletonMap;
5962
import static org.mockito.Mockito.spy;
6063

6164
public class QuerierTest {
@@ -167,7 +170,7 @@ private static Query makeRawQuery() {
167170
}
168171

169172
private static RunningQuery makeCountQueryWithAllWindow(BulletConfig config, int emitInterval) {
170-
GroupAll groupAll = new GroupAll(Collections.singleton(new GroupOperation(GroupOperation.GroupOperationType.COUNT, null, "COUNT")));
173+
GroupAll groupAll = new GroupAll(singleton(new GroupOperation(GroupOperation.GroupOperationType.COUNT, null, "COUNT")));
171174
Window window = WindowUtils.makeWindow(Window.Unit.TIME, emitInterval, Window.Unit.ALL, null);
172175

173176
Query query = new Query(new Projection(), null, groupAll, null, window, null);
@@ -830,7 +833,7 @@ public void testComputationAndCulling() {
830833
Expression filter = new UnaryExpression(new FieldExpression("a"), Operation.IS_NOT_NULL);
831834
Expression expression = new BinaryExpression(new FieldExpression("a"), new ValueExpression(2L), Operation.ADD);
832835
Computation computation = new Computation(Collections.singletonList(new Field("newName", expression)));
833-
Culling culling = new Culling(Collections.singleton("a"));
836+
Culling culling = new Culling(singleton("a"));
834837
Query query = new Query(projection, filter, new Raw(500), Arrays.asList(computation, culling), new Window(), null);
835838

836839
Querier querier = make(Querier.Mode.ALL, query);
@@ -891,4 +894,46 @@ public void testTableFunction() {
891894
Assert.assertEquals(result.get(3).fieldCount(), 1);
892895
Assert.assertEquals(result.get(3).typedGet("abc").getValue(), 3);
893896
}
897+
898+
@Test
899+
public void testOuterQuery() {
900+
Expression outerQueryFilter = new BinaryExpression(new FieldExpression("count"), new ValueExpression(1), Operation.GREATER_THAN);
901+
Query outerQuery = new Query(new Projection(), outerQueryFilter, new Raw(3), null, new Window(), null);
902+
GroupBy groupBy = new GroupBy(null, singletonMap("color", "color"), singleton(new GroupOperation(GroupOperation.GroupOperationType.COUNT, null, "count")));
903+
Query query = new Query(null, new Projection(), null, groupBy, null, outerQuery, new Window(), null);
904+
905+
Querier querier = make(Querier.Mode.ALL, query);
906+
907+
BulletRecord red = RecordBox.get().add("color", "red").getRecord();
908+
BulletRecord orange = RecordBox.get().add("color", "orange").getRecord();
909+
BulletRecord yellow = RecordBox.get().add("color", "yellow").getRecord();
910+
BulletRecord green = RecordBox.get().add("color", "green").getRecord();
911+
BulletRecord blue = RecordBox.get().add("color", "blue").getRecord();
912+
BulletRecord indigo = RecordBox.get().add("color", "indigo").getRecord();
913+
BulletRecord violet = RecordBox.get().add("color", "violet").getRecord();
914+
915+
querier.consume(red);
916+
querier.consume(orange);
917+
querier.consume(yellow);
918+
querier.consume(green);
919+
querier.consume(blue);
920+
querier.consume(indigo);
921+
querier.consume(violet);
922+
querier.consume(yellow);
923+
querier.consume(green);
924+
querier.consume(blue);
925+
querier.consume(indigo);
926+
querier.consume(violet);
927+
928+
Clip result = querier.getResult();
929+
List<BulletRecord> records = result.getRecords();
930+
Assert.assertEquals(records.size(), 3);
931+
Assert.assertFalse(records.stream().anyMatch(record -> record.typedGet("color").getValue().equals("red") ||
932+
record.typedGet("color").getValue().equals("orange")));
933+
Assert.assertTrue(records.stream().allMatch(record -> record.typedGet("count").getValue().equals(2L)));
934+
935+
Map<String, Object> metadata = result.getMeta().asMap();
936+
937+
Assert.assertTrue(metadata.containsKey("Subquery"));
938+
}
894939
}

0 commit comments

Comments
 (0)