Skip to content

Commit c78ef01

Browse files
authored
Closes #22. (BREAK) Adding a new partition mode for querier (#23)
1 parent 1a9a0d7 commit c78ef01

File tree

10 files changed

+372
-138
lines changed

10 files changed

+372
-138
lines changed

src/main/java/com/yahoo/bullet/querying/Querier.java

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,11 @@
6161
* <li>
6262
* For each Query message from the PubSub, check to see if it is has a KILL or COMPLETE signal.
6363
* If yes, remove any existing {@link Querier} objects for that query (identified by the ID)
64-
* If no, create an instance of {@link Querier} for the query. If any exceptions or errors initializing, throw away
65-
* the querier.
64+
* If no, create an instance of {@link Querier} for the query in {@link Mode#PARTITION} mode if and only if you are
65+
* going to be persisting the querier for the duration of the query. If you are throwing away the querier, such as
66+
* after processing your partitioned data in mini-batches and recreating it every new mini-batch, then you need not
67+
* change the mode. If any exceptions or errors initializing, throw away the querier since the errors are handled in
68+
* the Join stage below.
6669
* </li>
6770
* <li>
6871
* For every {@link BulletRecord}, call {@link #consume(BulletRecord)} on all the {@link Querier} objects
@@ -72,27 +75,29 @@
7275
* If {@link #isDone()}, call {@link #getData()} and also remove the {@link Querier}.
7376
* </li>
7477
* <li>
75-
* If {@link #isClosedForPartition()}, use {@link #getData()} to emit the intermediate data to the Join stage for
78+
* If {@link #isClosed()}, use {@link #getData()} to emit the intermediate data to the Join stage for
7679
* the query ID. Then, call {@link #reset()}.
7780
* </li>
7881
* <li>
79-
* <em>Optional</em>: if you are processing record by record (instead of micro-batches) and honoring {@link #isClosedForPartition()},
82+
* <em>Optional</em>: if you are processing record by record (instead of micro-batches) and honoring {@link #isClosed()},
8083
* you should check if {@link #isExceedingRateLimit()} is true after calling {@link #getData()}. If yes, you should
8184
* cancel the query and emit a RateLimitError to the Join stage to kill the query. You can use {@link #getRateLimitError()}
8285
* to get the {@link RateLimitError} to pass to the Join stage.
8386
* </li>
8487
* <li>
8588
* <em>Optional</em>: If your data volume is very, very small (Heuristic: less than 1 per your 0.1 *
8689
* bullet.query.window.min.emit.every.ms). across your partitions), you should run the {@link #isDone()} and
87-
* {@link #isClosedForPartition()} and do the emits either on a timer or at fixed intervals so that your queries
90+
* {@link #isClosed()} and do the emits either on a timer or at fixed intervals so that your queries
8891
* are checked for results and maintain their windowing guarantees.
8992
* </li>
9093
* </ol>
9194
*
92-
* You can also use {@link #hasData()} to check if there is any data to emit if you need. If you do not want to call
93-
* {@link #getData()}, you can serialize Querier using non-native serialization frameworks and use {@link #merge(Monoidal)}
94-
* in the Join stage to merge them into an empty Querier for the query. This will be equivalent to calling
95-
* {@link #combine(byte[])} on {@link #getData()}. Just remember to not call {@link #initialize()}
95+
* You can also use {@link #hasNewData()} to check if there is any new data to emit if you need to know a successful
96+
* consumption or combining happened.
97+
*
98+
* If you do not want to call {@link #getData()}, you can serialize Querier using non-native serialization frameworks
99+
* and use {@link #merge(Monoidal)} in the Join stage to merge them into an empty Querier for the query. This will be
100+
* equivalent to calling {@link #combine(byte[])} on {@link #getData()}. Just remember to not call {@link #initialize()}
96101
* on the reified querier objects on the Join side since that will wipe the existing results stored in them.
97102
*
98103
* <h4>Pseudo Code</h4>
@@ -115,7 +120,7 @@
115120
* emit(q.getData())
116121
* remove q
117122
* else
118-
* if (q.isClosedForPartition())
123+
* if (q.isClosed())
119124
* emit(q.getData())
120125
* q.reset()
121126
* q.consume(record)
@@ -131,7 +136,7 @@
131136
* if (q.isDone())
132137
* emit(q.getData())
133138
* remove q
134-
* if (q.isClosedForPartition())
139+
* if (q.isClosed())
135140
* emit(q.getData())
136141
* q.reset()
137142
* if (q.isExceedingRateLimit())
@@ -144,8 +149,8 @@
144149
* <ol>
145150
* <li>
146151
* For each Query message from the PubSub, if it is a KILL signal similar to the Filter stage, kill the query and
147-
* return. Otherwise create an instance of {@link Querier} for the query. If any exceptions or errors initializing it,
148-
* make BulletError objects from them and return them as a {@link Clip} back through the PubSub.
152+
* return. Otherwise create an instance of {@link Querier} for the query in {@link Mode#ALL} mode. If any exceptions
153+
* or errors initializing it, make BulletError objects from them and return them as a {@link Clip} back through the PubSub.
149154
* </li>
150155
* <li>
151156
* For each KILL message from the Filter stage, call {@link #finish()}, and add to the {@link Meta} a
@@ -258,6 +263,20 @@
258263
*/
259264
@Slf4j
260265
public class Querier implements Monoidal {
266+
/**
267+
* This is used to determine if this operates in partitioned mode or not. If the Querier is operating in
268+
* {@link Mode#PARTITION}, it is assumed there are multiple queriers running in parallel and consuming parts of the
269+
* data for the query. Use this if you are distributing the {@link #consume(BulletRecord)} calls across multiple
270+
* machines. This fixes the semantics of the {@link #reset()} and the {@link #isClosed()} methods to keep the
271+
* correct windowing semantics.
272+
*
273+
* If you are not distributing the data or recreating the querier instance in your parallelized step, you can
274+
* leave this at the default of {@link Mode#ALL}.
275+
*/
276+
public enum Mode {
277+
PARTITION, ALL
278+
}
279+
261280
public static final String TRY_AGAIN_LATER = "Please try again later";
262281

263282
// For testing convenience
@@ -271,11 +290,14 @@ public class Querier implements Monoidal {
271290
private BulletConfig config;
272291
private Map<String, String> metaKeys;
273292
private String timestampKey;
274-
private boolean hasData = false;
293+
private boolean hasNewData = false;
275294

276295
// This is counting the number of times we get the data out of the query.
277296
private RateLimiter rateLimit;
278297

298+
// Mode for the querier
299+
private Mode mode;
300+
279301
/**
280302
* Constructor that takes a String representation of the query and a configuration to use. This also starts the
281303
* query.
@@ -286,9 +308,22 @@ public class Querier implements Monoidal {
286308
* @throws JsonParseException if there was an issue parsing the query.
287309
*/
288310
public Querier(String id, String queryString, BulletConfig config) throws JsonParseException {
289-
this(new RunningQuery(id, queryString, config), config);
311+
this(Mode.ALL, new RunningQuery(id, queryString, config), config);
290312
}
291313

314+
/**
315+
* Constructor that takes a String representation of the query and a configuration to use. This also starts the
316+
* query.
317+
*
318+
* @param mode The mode for this querier.
319+
* @param id The query ID.
320+
* @param queryString The query as a string.
321+
* @param config The validated {@link BulletConfig} configuration to use.
322+
* @throws JsonParseException if there was an issue parsing the query.
323+
*/
324+
public Querier(Mode mode, String id, String queryString, BulletConfig config) throws JsonParseException {
325+
this(mode, new RunningQuery(id, queryString, config), config);
326+
}
292327
/**
293328
* Constructor that takes a {@link RunningQuery} instance and a configuration to use. This also starts executing
294329
* the query.
@@ -297,6 +332,19 @@ public Querier(String id, String queryString, BulletConfig config) throws JsonPa
297332
* @param config The validated {@link BulletConfig} configuration to use.
298333
*/
299334
public Querier(RunningQuery query, BulletConfig config) {
335+
this(Mode.ALL, query, config);
336+
}
337+
338+
/**
339+
* Constructor that takes a {@link Querier.Mode}, {@link RunningQuery} instance and a configuration to use.
340+
* This also starts executing the query.
341+
*
342+
* @param mode The mode for this querier.
343+
* @param query The running query.
344+
* @param config The validated {@link BulletConfig} configuration to use.
345+
*/
346+
public Querier(Mode mode, RunningQuery query, BulletConfig config) {
347+
this.mode = mode;
300348
this.runningQuery = query;
301349
this.config = config;
302350
}
@@ -362,7 +410,7 @@ public void consume(BulletRecord record) {
362410
BulletRecord projected = project(record);
363411
try {
364412
window.consume(projected);
365-
hasData = true;
413+
hasNewData = true;
366414
} catch (RuntimeException e) {
367415
log.error("Unable to consume {} for query {}", record, this);
368416
log.error("Skipping due to", e);
@@ -379,7 +427,7 @@ public void consume(BulletRecord record) {
379427
public void combine(byte[] data) {
380428
try {
381429
window.combine(data);
382-
hasData = true;
430+
hasNewData = true;
383431
} catch (RuntimeException e) {
384432
log.error("Unable to aggregate {} for query {}", data, this);
385433
log.error("Skipping due to", e);
@@ -458,41 +506,32 @@ public Clip getResult() {
458506
}
459507

460508
/**
461-
* Returns true if the query window is closed and you should emit the result at this time. If you have partitioned
462-
* the data, use {@link #isClosedForPartition()}.
509+
* Depending on the {@link Mode#ALL} mode this is operating in, returns true if and only if the query window is
510+
* closed and you should emit the result at this time.
463511
*
464-
* @return boolean denoting if query has expired.
512+
* @return boolean denoting if query has closed.
465513
*/
466514
@Override
467515
public boolean isClosed() {
468-
return window.isClosed();
516+
return mode == Mode.PARTITION ? window.isClosedForPartition() : window.isClosed();
469517
}
470518

471519
/**
472520
* Resets this object. You should call this if you have called {@link #getResult()} or {@link #getData()} after
473-
* verifying whether this is {@link #isClosed()} or {@link #isClosedForPartition()}.
521+
* verifying whether this is {@link #isClosed()}.
474522
*/
475523
@Override
476524
public void reset() {
477-
window.reset();
478-
hasData = false;
525+
if (mode == Mode.PARTITION) {
526+
window.resetForPartition();
527+
} else {
528+
window.reset();
529+
}
530+
hasNewData = false;
479531
}
480532

481533
// ********************************* Public helpers *********************************
482534

483-
/**
484-
* Returns true if the query has been consuming parts of the data (parallelized) and should emit the result
485-
* for that partition of data when operating that way. Use this if you have distributed the
486-
* {@link #consume(BulletRecord)} calls across multiple machines and you want to know if, for this particular kind
487-
* of query, whether it is necessary to emit results now. While not necessary to use, it would keep the
488-
* windowing semantics for the query correct to adhere to emitting when this is true.
489-
*
490-
* @return A boolean denoting whether there is data to emit for this query if it was reading part of the data.
491-
*/
492-
public boolean isClosedForPartition() {
493-
return window.isClosedForPartition();
494-
}
495-
496535
/**
497536
* Returns true if the query has expired and will never accept any more data.
498537
*
@@ -504,13 +543,14 @@ public boolean isDone() {
504543
}
505544

506545
/**
507-
* Returns whether there is any data to emit at all. Use this method if you are driving how data is consumed by this
508-
* instance (for instance, microbatches) and need to emit data outside the windowing standards.
546+
* Returns whether there is any new data to emit at all since the last {@link #reset()}. Use this method if you are
547+
* driving how data is consumed by this instance (for instance, microbatches) and need to emit data outside the
548+
* windowing standards.
509549
*
510-
* @return A boolean denoting whether we have any data that can be emitted.
550+
* @return A boolean denoting whether we have any new data that can be emitted.
511551
*/
512-
public boolean hasData() {
513-
return hasData;
552+
public boolean hasNewData() {
553+
return hasNewData;
514554
}
515555

516556
/**

src/main/java/com/yahoo/bullet/windowing/AdditiveTumbling.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@ public AdditiveTumbling(Strategy aggregation, Window window, BulletConfig config
2323

2424
@Override
2525
public void reset() {
26-
// Don't reset the strategy.
2726
startedAt = System.currentTimeMillis();
2827
windowCount++;
2928
}
29+
30+
@Override
31+
public void resetForPartition() {
32+
// Do reset the strategy.
33+
aggregation.reset();
34+
reset();
35+
}
3036
}

src/main/java/com/yahoo/bullet/windowing/Basic.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ public void reset() {
9393
windowCount++;
9494
}
9595

96+
@Override
97+
public void resetForPartition() {
98+
reset();
99+
}
100+
96101
@Override
97102
public boolean isClosed() {
98103
return aggregation.isClosed();

src/main/java/com/yahoo/bullet/windowing/Scheme.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ public Scheme(Strategy aggregation, Window window, BulletConfig config) {
5959
*/
6060
public abstract boolean isClosedForPartition();
6161

62+
/**
63+
* Resets the window when operating in partition mode. If this window has been consuming slices of data (partitions)
64+
* instead of the full data, you should use this method to reset the window to maintain the windowing invariant.
65+
*/
66+
public abstract void resetForPartition();
67+
6268
/**
6369
* Return any {@link Meta} for this windowing scheme and the {@link Strategy}.
6470
*

0 commit comments

Comments
 (0)