@@ -35,6 +35,7 @@ import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache
35
35
import scala .collection .JavaConversions ._
36
36
import scala .collection .mutable .ListBuffer
37
37
import scala .language .implicitConversions
38
+ import scala .util .control .Breaks ._
38
39
39
40
40
41
case class ClientConfiguration (host : String ,
@@ -125,6 +126,8 @@ object CosmosDBConnection extends CosmosDBLoggingTrait {
125
126
private [spark] case class CosmosDBConnection (config : Config ) extends CosmosDBLoggingTrait with Serializable {
126
127
127
128
private val databaseName = config.get[String ](CosmosDBConfig .Database ).get
129
+ private val maxPagesPerBatch =
130
+ config.getOrElse[String ](CosmosDBConfig .ChangeFeedMaxPagesPerBatch , CosmosDBConfig .DefaultChangeFeedMaxPagesPerBatch .toString).toInt
128
131
private val databaseLink = s " ${Paths .DATABASES_PATH_SEGMENT }/ $databaseName"
129
132
private val collectionName = config.get[String ](CosmosDBConfig .Collection ).get
130
133
val collectionLink = s " ${Paths .DATABASES_PATH_SEGMENT }/ $databaseName/ ${Paths .COLLECTIONS_PATH_SEGMENT }/ $collectionName"
@@ -246,33 +249,149 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
246
249
}
247
250
248
251
def readChangeFeed (changeFeedOptions : ChangeFeedOptions , isStreaming : Boolean , shouldInferStreamSchema : Boolean ): Tuple2 [Iterator [Document ], String ] = {
252
+ logDebug(s " --> readChangeFeed, PageSize: ${changeFeedOptions.getPageSize().toString()}, ContinuationToken: ${changeFeedOptions.getRequestContinuation()}, PartitionId: ${changeFeedOptions.getPartitionKeyRangeId()}, ShouldInferSchema: ${shouldInferStreamSchema.toString()}" )
253
+
254
+ // The ChangeFeed API in the SDK allows accessing the continuation token
255
+ // from the latest HTTP Response
256
+ // This is not sufficient to build a correct continuation token when
257
+ // the "ChangeFeedMaxPagesPerBatch" limit is reached, because "blocks" that
258
+ // can be retrieved from the SDK can span two or more underlying pages. So the first records in
259
+ // the block can only be retrieved with the previous continuation token - the last
260
+ // records would have the continuation token of the latest HTTP response that is retrievable
261
+ // The variables below are used to store context necessary to form a continuation token
262
+ // that allows bookmarking an individual record within the changefeed
263
+ // The continuation token that would need to be used to safely allow retrieving changerecords
264
+ // after a bookmark in the form of <blockStartContinuation>|<lastProcessedIdBookmark>
265
+ // Meaning the <blockStartContinuation> needs to be at a previous or the same page as the change record
266
+ // document with Id <lastProcessedIdBookmark>
267
+
268
+ // Indicator whether we found the first not yet processed change record
269
+ var foundBookmark = true
270
+
271
+ // The id of the last document that has been processed and returned to the caller
272
+ var lastProcessedIdBookmark = " "
273
+
274
+ // The original continuation that has been passed to this method by the caller
275
+ var originalContinuation = changeFeedOptions.getRequestContinuation()
276
+
277
+ // The next continuation token that is returned to the caller to continue
278
+ // processing the change feed
279
+ var nextContinuation = changeFeedOptions.getRequestContinuation()
280
+ if (originalContinuation != null &&
281
+ originalContinuation.contains(" |" ))
282
+ {
283
+ val continuationFragments = originalContinuation.split('|' )
284
+ originalContinuation = continuationFragments(0 )
285
+ changeFeedOptions.setRequestContinuation(originalContinuation)
286
+ lastProcessedIdBookmark = continuationFragments(1 )
287
+ foundBookmark = false
288
+ }
289
+
290
+ // The continuation token that would need to be used to safely allow retrieving changerecords
291
+ // after a bookmark in the form of <blockStartContinuation>|<lastProcessedIdBookmark>
292
+ // Meaning the <blockStartContinuation> needs to be at a previous or the same page as the change record
293
+ // document with Id <lastProcessedIdBookmark>
294
+ var previousBlockStartContinuation = originalContinuation
295
+
296
+ // blockStartContinuation is used as a place holder to store the feedResponse.getResponseContinuation()
297
+ // of the previous HTTP response to be able to apply it to previousBlockStartContinuation
298
+ // accordingly
299
+ var blockStartContinuation = originalContinuation
300
+
301
+ // This method can result in reading the next page of the changefeed and changing the continuation token header
249
302
val feedResponse = documentClient.queryDocumentChangeFeed(collectionLink, changeFeedOptions)
303
+ logDebug(s " readChangeFeed.InitialResponseContinuation: ${feedResponse.getResponseContinuation()}" )
304
+
305
+ // If processing from the beginning (no continuation token passed into this method)
306
+ // it is safe to increase previousBlockStartContinuation here because we always at least return
307
+ // one page
308
+ if (Option (originalContinuation).getOrElse(" " ).isEmpty)
309
+ {
310
+ blockStartContinuation = feedResponse.getResponseContinuation()
311
+ previousBlockStartContinuation = blockStartContinuation
312
+ }
313
+
250
314
if (isStreaming) {
315
+ var pageCount = 0 ;
316
+
317
+ var isFirstBlock = true ;
251
318
// In streaming scenario, the change feed need to be materialized in order to get the information of the continuation token
252
319
val cfDocuments : ListBuffer [Document ] = new ListBuffer [Document ]
253
- while (feedResponse.getQueryIterator.hasNext) {
254
- val feedItems = feedResponse.getQueryIterable.fetchNextBlock()
255
- if (shouldInferStreamSchema )
320
+ breakable {
321
+ // hasNext can result in reading the next page of the changefeed and changing the continuation token header
322
+ while (feedResponse.getQueryIterator.hasNext )
256
323
{
257
- cfDocuments.addAll(feedItems)
258
- } else {
259
- for (feedItem <- feedItems) {
260
- val streamDocument : Document = new Document ()
261
- streamDocument.set(" body" , feedItem.toJson)
262
- streamDocument.set(" id" , feedItem.get(" id" ))
263
- streamDocument.set(" _rid" , feedItem.get(" _rid" ))
264
- streamDocument.set(" _self" , feedItem.get(" _self" ))
265
- streamDocument.set(" _etag" , feedItem.get(" _etag" ))
266
- streamDocument.set(" _attachments" , feedItem.get(" _attachments" ))
267
- streamDocument.set(" _ts" , feedItem.get(" _ts" ))
268
- cfDocuments.add(streamDocument)
324
+ logDebug(s " readChangeFeed.InWhile ContinuationToken: ${blockStartContinuation}" )
325
+ // fetchNextBlock can result in reading the next page of the changefeed and changing the continuation token header
326
+ val feedItems = feedResponse.getQueryIterable.fetchNextBlock()
327
+
328
+ for (feedItem <- feedItems)
329
+ {
330
+ if (! foundBookmark)
331
+ {
332
+ if (feedItem.get(" id" ) == lastProcessedIdBookmark)
333
+ {
334
+ logDebug(" readChangeFeed.FoundBookmarkDueToIdMatch" )
335
+ foundBookmark = true
336
+ }
337
+ }
338
+ else
339
+ {
340
+ if (shouldInferStreamSchema)
341
+ {
342
+ cfDocuments.add(feedItem)
343
+ }
344
+ else
345
+ {
346
+ val streamDocument : Document = new Document ()
347
+ streamDocument.set(" body" , feedItem.toJson)
348
+ streamDocument.set(" id" , feedItem.get(" id" ))
349
+ streamDocument.set(" _rid" , feedItem.get(" _rid" ))
350
+ streamDocument.set(" _self" , feedItem.get(" _self" ))
351
+ streamDocument.set(" _etag" , feedItem.get(" _etag" ))
352
+ streamDocument.set(" _attachments" , feedItem.get(" _attachments" ))
353
+ streamDocument.set(" _ts" , feedItem.get(" _ts" ))
354
+
355
+ cfDocuments.add(streamDocument)
356
+ }
357
+ }
358
+ }
359
+ logDebug(s " Receving ${cfDocuments.length.toString()} change feed items ${if (cfDocuments.nonEmpty) cfDocuments(0 )}" )
360
+
361
+ if (cfDocuments.length > 0 )
362
+ {
363
+ pageCount += 1 ;
364
+ }
365
+
366
+ if (pageCount >= maxPagesPerBatch)
367
+ {
368
+ nextContinuation = previousBlockStartContinuation + " |" + feedItems.last.get(" id" )
369
+
370
+ logDebug(s " readChangeFeed.MaxPageCountExceeded NextContinuation: ${nextContinuation}" )
371
+ break;
372
+ }
373
+ else
374
+ {
375
+ // next Continuation Token is plain and simple the same as the latest HTTP response
376
+ // Expected when all records of the current page have been processed
377
+ // Will only get returned to the caller when the changefeed has been processed completely
378
+ // as a continuation token that the caller can use afterwards to see whether the changefeed
379
+ // contains new change record documents
380
+ nextContinuation = feedResponse.getResponseContinuation()
381
+
382
+ previousBlockStartContinuation = blockStartContinuation
383
+ blockStartContinuation = nextContinuation
384
+
385
+ logDebug(s " readChangeFeed.EndInWhile NextContinuation: ${nextContinuation}, blockStartContinuation: ${blockStartContinuation}, previousBlockStartContinuation: ${previousBlockStartContinuation}" )
269
386
}
270
387
}
271
- logDebug(s " Receving change feed items ${if (feedItems.nonEmpty) feedItems(0 )}" )
272
388
}
273
- Tuple2 .apply(cfDocuments.iterator(), feedResponse.getResponseContinuation)
274
- } else {
275
- Tuple2 .apply(feedResponse.getQueryIterator, feedResponse.getResponseContinuation)
389
+
390
+ logDebug(s " <-- readChangeFeed, Count: ${cfDocuments.length.toString()}, NextContinuation: ${nextContinuation}" )
391
+ Tuple2 .apply(cfDocuments.iterator(), nextContinuation)
392
+ } else
393
+ {
394
+ Tuple2 .apply(feedResponse.getQueryIterator, nextContinuation)
276
395
}
277
396
}
278
397
0 commit comments