You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: algoliasearch/ingestion/client.py
+94-76Lines changed: 94 additions & 76 deletions
Original file line number
Diff line number
Diff line change
@@ -217,8 +217,12 @@ async def chunked_push(
217
217
"""
218
218
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
219
219
"""
220
+
offset=0
220
221
records: List[PushTaskRecords] = []
221
222
responses: List[WatchResponse] = []
223
+
wait_batch_size=batch_size//10
224
+
ifwait_batch_size<1:
225
+
wait_batch_size=batch_size
222
226
fori, objinenumerate(objects):
223
227
records.append(obj) # pyright: ignore
224
228
iflen(records) ==batch_sizeori==len(objects) -1:
@@ -234,44 +238,49 @@ async def chunked_push(
234
238
)
235
239
)
236
240
records= []
237
-
ifwait_for_tasks:
238
-
forresponseinresponses:
239
-
240
-
asyncdef_func(_: Optional[Event]) ->Event:
241
-
ifresponse.event_idisNone:
242
-
raiseValueError(
243
-
"received unexpected response from the push endpoint, eventID must not be undefined"
244
-
)
245
-
try:
246
-
returnawaitself.get_event(
247
-
run_id=response.run_id,
248
-
event_id=response.event_id,
249
-
request_options=request_options,
250
-
)
251
-
exceptRequestExceptionase:
252
-
ife.status_code==404:
253
-
returnNone# pyright: ignore
254
-
raisee
255
-
256
-
_retry_count=0
257
-
258
-
def_aggregator(_: Event|None) ->None:
259
-
nonlocal_retry_count
260
-
_retry_count+=1
261
-
262
-
def_validate(_resp: Event|None) ->bool:
263
-
return_respisnotNone
264
-
265
-
timeout=RetryTimeout()
266
-
267
-
awaitcreate_iterable(
268
-
func=_func,
269
-
validate=_validate,
270
-
aggregator=_aggregator,
271
-
timeout=lambda: timeout(_retry_count),
272
-
error_validate=lambda_: _retry_count>=50,
273
-
error_message=lambda_: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
274
-
)
241
+
if (
242
+
wait_for_tasks
243
+
andlen(responses) >0
244
+
and (len(responses) %wait_batch_size==0ori==len(objects) -1)
"received unexpected response from the push endpoint, eventID must not be undefined"
252
+
)
253
+
try:
254
+
returnawaitself.get_event(
255
+
run_id=response.run_id,
256
+
event_id=response.event_id,
257
+
request_options=request_options,
258
+
)
259
+
exceptRequestExceptionase:
260
+
ife.status_code==404:
261
+
returnNone# pyright: ignore
262
+
raisee
263
+
264
+
_retry_count=0
265
+
266
+
def_aggregator(_: Event|None) ->None:
267
+
nonlocal_retry_count
268
+
_retry_count+=1
269
+
270
+
def_validate(_resp: Event|None) ->bool:
271
+
return_respisnotNone
272
+
273
+
timeout=RetryTimeout()
274
+
275
+
awaitcreate_iterable(
276
+
func=_func,
277
+
validate=_validate,
278
+
aggregator=_aggregator,
279
+
timeout=lambda: timeout(_retry_count),
280
+
error_validate=lambda_: _retry_count>=50,
281
+
error_message=lambda_: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
282
+
)
283
+
offset+=wait_batch_size
275
284
returnresponses
276
285
277
286
asyncdefcreate_authentication_with_http_info(
@@ -5326,8 +5335,12 @@ def chunked_push(
5326
5335
"""
5327
5336
Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit in `push` requests by leveraging the Transformation pipeline setup in the Push connector (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
5328
5337
"""
5338
+
offset=0
5329
5339
records: List[PushTaskRecords] = []
5330
5340
responses: List[WatchResponse] = []
5341
+
wait_batch_size=batch_size//10
5342
+
ifwait_batch_size<1:
5343
+
wait_batch_size=batch_size
5331
5344
fori, objinenumerate(objects):
5332
5345
records.append(obj) # pyright: ignore
5333
5346
iflen(records) ==batch_sizeori==len(objects) -1:
@@ -5343,44 +5356,49 @@ def chunked_push(
5343
5356
)
5344
5357
)
5345
5358
records= []
5346
-
ifwait_for_tasks:
5347
-
forresponseinresponses:
5348
-
5349
-
def_func(_: Optional[Event]) ->Event:
5350
-
ifresponse.event_idisNone:
5351
-
raiseValueError(
5352
-
"received unexpected response from the push endpoint, eventID must not be undefined"
5353
-
)
5354
-
try:
5355
-
returnself.get_event(
5356
-
run_id=response.run_id,
5357
-
event_id=response.event_id,
5358
-
request_options=request_options,
5359
-
)
5360
-
exceptRequestExceptionase:
5361
-
ife.status_code==404:
5362
-
returnNone# pyright: ignore
5363
-
raisee
5364
-
5365
-
_retry_count=0
5366
-
5367
-
def_aggregator(_: Event|None) ->None:
5368
-
nonlocal_retry_count
5369
-
_retry_count+=1
5370
-
5371
-
def_validate(_resp: Event|None) ->bool:
5372
-
return_respisnotNone
5373
-
5374
-
timeout=RetryTimeout()
5375
-
5376
-
create_iterable_sync(
5377
-
func=_func,
5378
-
validate=_validate,
5379
-
aggregator=_aggregator,
5380
-
timeout=lambda: timeout(_retry_count),
5381
-
error_validate=lambda_: _retry_count>=50,
5382
-
error_message=lambda_: f"The maximum number of retries exceeded. (${_retry_count}/${50})",
5383
-
)
5359
+
if (
5360
+
wait_for_tasks
5361
+
andlen(responses) >0
5362
+
and (len(responses) %wait_batch_size==0ori==len(objects) -1)
0 commit comments