Skip to content

Commit e1ddc2d

Browse files
authored
Closes #40. Support full sliding window (#44)
1 parent 21d7753 commit e1ddc2d

File tree

16 files changed

+25
-326
lines changed

16 files changed

+25
-326
lines changed

src/main/java/com/yahoo/bullet/parsing/Window.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ public enum Classification {
7373
"Please remove include or match it to emit");
7474
public static final BulletError IMPROPER_FIRST = makeError("The first field should not be set for \"ALL\"",
7575
"Please remove the first field");
76-
public static final BulletError NOT_ONE_RECORD_EMIT = makeError("The emit type was \"RECORD\" but every was not 1",
77-
"Please set every to 1 or change the emit type");
7876
public static final BulletError NO_RECORD_ALL = makeError("The emit type was \"RECORD\" and include type was \"ALL\"",
7977
"Please set emit type to \"TIME\" or match include to emit");
8078

@@ -121,9 +119,6 @@ public Optional<List<BulletError>> initialize() {
121119
if (every == null || every.intValue() <= 0) {
122120
return Optional.of(singletonList(IMPROPER_EVERY));
123121
}
124-
if (emitType == Unit.RECORD && every.intValue() != 1) {
125-
return Optional.of(singletonList(NOT_ONE_RECORD_EMIT));
126-
}
127122

128123
if (include == null) {
129124
return Optional.empty();

src/main/java/com/yahoo/bullet/pubsub/rest/RESTPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public abstract class RESTPublisher implements Publisher {
2828
* Create a RESTQueryPublisher from a {@link CloseableHttpClient}.
2929
*
3030
* @param client The client.
31+
* @param connectTimeout The minimum time (ms) to wait for a connection to be made.
3132
*/
3233
public RESTPublisher(CloseableHttpClient client, int connectTimeout) {
3334
this.client = client;

src/main/java/com/yahoo/bullet/pubsub/rest/RESTQueryPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class RESTQueryPublisher extends RESTPublisher {
2525
* @param client The client.
2626
* @param queryURL The URL to which to POST queries.
2727
* @param resultURL The URL that will be added to the Metadata (results will be sent to this URL from the backend).
28+
* @param connectTimeout The minimum time (ms) to wait for a connection to be made.
2829
*/
2930
public RESTQueryPublisher(CloseableHttpClient client, String queryURL, String resultURL, int connectTimeout) {
3031
super(client, connectTimeout);

src/main/java/com/yahoo/bullet/pubsub/rest/RESTResultPublisher.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class RESTResultPublisher extends RESTPublisher {
1515
* Create a RESTQueryPublisher from a {@link CloseableHttpClient}.
1616
*
1717
* @param client The client.
18+
* @param connectTimeout The minimum time (ms) to wait for a connection to be made.
1819
*/
1920
public RESTResultPublisher(CloseableHttpClient client, int connectTimeout) {
2021
super(client, connectTimeout);

src/main/java/com/yahoo/bullet/querying/Querier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ public enum Mode {
290290
private Scheme window;
291291

292292
// For testing convenience
293-
@Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE)
293+
@Getter(AccessLevel.PACKAGE)
294294
private RunningQuery runningQuery;
295295

296296
private BulletConfig config;

src/main/java/com/yahoo/bullet/querying/RateLimiter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@
1818
* the rate limit within your time interval but you neglect to check it for a long enough time interval where the burst
1919
* is spread out over bringing the overall rate lower than your configured maximum and yielding a false negative.
2020
*/
21-
@Getter
2221
public class RateLimiter {
22+
@Getter
2323
private final int maximum;
24+
@Getter
2425
private final int timeInterval;
2526
private final double absoluteRateLimit;
2627

2728
private long count = 0;
2829
private long lastCount = 0;
2930
private long lastCheckTime;
31+
@Getter
3032
private boolean exceededRate = false;
3133

3234
public static final int SECOND = 1000;

src/main/java/com/yahoo/bullet/querying/WindowingOperations.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import com.yahoo.bullet.parsing.Window;
1212
import com.yahoo.bullet.windowing.AdditiveTumbling;
1313
import com.yahoo.bullet.windowing.Basic;
14-
import com.yahoo.bullet.windowing.Reactive;
1514
import com.yahoo.bullet.windowing.Scheme;
15+
import com.yahoo.bullet.windowing.SlidingRecord;
1616
import com.yahoo.bullet.windowing.Tumbling;
1717

1818
public class WindowingOperations {
@@ -29,7 +29,7 @@ public static Scheme findScheme(Query query, Strategy strategy, BulletConfig con
2929
* TODO: Support other windows
3030
* The windows we support at the moment:
3131
* 1. No window -> Basic
32-
* 2. Window is emit RECORD and include RECORD -> Reactive
32+
* 2. Window is emit RECORD and include RECORD -> SlidingRecord
3333
* 3. Window is emit TIME and include ALL -> Additive Tumbling
3434
* 4. All other windows -> Tumbling (RAW can be Tumbling too)
3535
*/
@@ -40,7 +40,7 @@ public static Scheme findScheme(Query query, Strategy strategy, BulletConfig con
4040

4141
Window.Classification classification = window.getType();
4242
if (classification == Window.Classification.RECORD_RECORD) {
43-
return new Reactive(strategy, window, config);
43+
return new SlidingRecord(strategy, window, config);
4444
}
4545
if (classification == Window.Classification.TIME_ALL) {
4646
return new AdditiveTumbling(strategy, window, config);

src/main/java/com/yahoo/bullet/windowing/Reactive.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

src/main/java/com/yahoo/bullet/windowing/SlidingRecord.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public class SlidingRecord extends Basic {
3030

3131
public static final BulletError NOT_RECORD = makeError("The \"type\" for \"emit\" was not \"RECORD\"",
3232
"Please set \"type\" to \"RECORD\"");
33-
protected int maxCount;
34-
protected int recordCount;
33+
private int maxCount;
34+
private int recordCount;
3535

3636
@AllArgsConstructor @Getter
3737
public static class Data implements Serializable {

src/test/java/com/yahoo/bullet/parsing/QueryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,13 @@ public void testCustomDuration() {
137137
public void testWindowForced() {
138138
BulletConfig config = new BulletConfig();
139139
Query query = new Query();
140-
query.setWindow(WindowUtils.makeReactiveWindow());
140+
query.setWindow(WindowUtils.makeSlidingWindow(1));
141141
query.configure(config);
142142
Assert.assertNotNull(query.getWindow());
143143

144144
config.set(BulletConfig.WINDOW_DISABLE, true);
145145
config.validate();
146-
query.setWindow(WindowUtils.makeReactiveWindow());
146+
query.setWindow(WindowUtils.makeSlidingWindow(1));
147147
query.configure(config);
148148
Assert.assertNull(query.getWindow());
149149
}

0 commit comments

Comments
 (0)