14
14
import java .time .Duration ;
15
15
import java .util .ArrayList ;
16
16
import java .util .EnumSet ;
17
+ import java .util .Iterator ;
17
18
import java .util .List ;
18
19
import java .util .Map ;
19
20
import java .util .concurrent .CompletableFuture ;
@@ -4997,9 +4998,19 @@ public <T> List<WatchResponse> chunkedPush(
4997
4998
) {
4998
4999
List <WatchResponse > responses = new ArrayList <>();
4999
5000
List <T > records = new ArrayList <>();
5001
+ int offset = 0 ;
5002
+ int waitBatchSize = batchSize / 10 ;
5003
+ if (waitBatchSize < 1 ) {
5004
+ waitBatchSize = batchSize ;
5005
+ }
5006
+
5007
+ Iterator <T > it = objects .iterator ();
5008
+ T current = it .next ();
5009
+
5010
+ while (true ) {
5011
+ records .add (current );
5000
5012
5001
- for (T item : objects ) {
5002
- if (records .size () == batchSize ) {
5013
+ if (records .size () == batchSize || !it .hasNext ()) {
5003
5014
WatchResponse watch = this .push (
5004
5015
indexName ,
5005
5016
new PushTaskPayload ().setAction (action ).setRecords (this .objectsToPushTaskRecords (records )),
@@ -5011,41 +5022,38 @@ public <T> List<WatchResponse> chunkedPush(
5011
5022
records .clear ();
5012
5023
}
5013
5024
5014
- records .add (item );
5015
- }
5025
+ if (waitForTasks && responses .size () > 0 && (responses .size () % waitBatchSize == 0 || !it .hasNext ())) {
5026
+ responses
5027
+ .subList (offset , Math .min (offset + waitBatchSize , responses .size ()))
5028
+ .forEach (response -> {
5029
+ TaskUtils .retryUntil (
5030
+ () -> {
5031
+ try {
5032
+ return this .getEvent (response .getRunID (), response .getEventID ());
5033
+ } catch (AlgoliaApiException e ) {
5034
+ if (e .getStatusCode () == 404 ) {
5035
+ return null ;
5036
+ }
5037
+
5038
+ throw e ;
5039
+ }
5040
+ },
5041
+ (Event resp ) -> {
5042
+ return resp != null ;
5043
+ },
5044
+ 50 ,
5045
+ null
5046
+ );
5047
+ });
5048
+
5049
+ offset += waitBatchSize ;
5050
+ }
5016
5051
5017
- if (records .size () > 0 ) {
5018
- WatchResponse watch = this .push (
5019
- indexName ,
5020
- new PushTaskPayload ().setAction (action ).setRecords (this .objectsToPushTaskRecords (records )),
5021
- waitForTasks ,
5022
- referenceIndexName ,
5023
- requestOptions
5024
- );
5025
- responses .add (watch );
5026
- }
5052
+ if (!it .hasNext ()) {
5053
+ break ;
5054
+ }
5027
5055
5028
- if (waitForTasks ) {
5029
- responses .forEach (response -> {
5030
- TaskUtils .retryUntil (
5031
- () -> {
5032
- try {
5033
- return this .getEvent (response .getRunID (), response .getEventID ());
5034
- } catch (AlgoliaApiException e ) {
5035
- if (e .getStatusCode () == 404 ) {
5036
- return null ;
5037
- }
5038
-
5039
- throw e ;
5040
- }
5041
- },
5042
- (Event resp ) -> {
5043
- return resp != null ;
5044
- },
5045
- 50 ,
5046
- null
5047
- );
5048
- });
5056
+ current = it .next ();
5049
5057
}
5050
5058
5051
5059
return responses ;
0 commit comments