@@ -249,7 +249,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
249
249
}
250
250
251
251
def readChangeFeed (changeFeedOptions : ChangeFeedOptions , isStreaming : Boolean , shouldInferStreamSchema : Boolean ): Tuple2 [Iterator [Document ], String ] = {
252
- logDebug(" --> readChangeFeed, PageSize: " + changeFeedOptions.getPageSize().toString() + " , ContinuationToken: " + changeFeedOptions.getRequestContinuation() + " , PartitionId: " + changeFeedOptions.getPartitionKeyRangeId() + " , ShouldInferSchema: " + shouldInferStreamSchema.toString())
252
+ logDebug(s " --> readChangeFeed, PageSize: ${ changeFeedOptions.getPageSize().toString()} , ContinuationToken: ${ changeFeedOptions.getRequestContinuation()} , PartitionId: ${ changeFeedOptions.getPartitionKeyRangeId()} , ShouldInferSchema: ${ shouldInferStreamSchema.toString()} " )
253
253
254
254
// The ChangeFeed API in the SDK allows accessing the continuation token
255
255
// from the latest HTTP Response
@@ -293,12 +293,14 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
293
293
// document with Id <lastProcessedIdBookmark>
294
294
var previousBlockStartContinuation = originalContinuation
295
295
296
- // The continuation token that would need to be used to retrive the next block
296
+ // blockStartContinuation is used as a place holder to store the feedResponse.getResponseContinuation()
297
+ // of the previous HTTP response to be able to apply it to previousBlockStartContinuation
298
+ // accordingly
297
299
var blockStartContinuation = originalContinuation
298
300
299
301
// This method can result in reading the next page of the changefeed and changing the continuation token header
300
302
val feedResponse = documentClient.queryDocumentChangeFeed(collectionLink, changeFeedOptions)
301
- logDebug(" readChangeFeed.InitialResponseContinuation: " + feedResponse.getResponseContinuation())
303
+ logDebug(s " readChangeFeed.InitialResponseContinuation: ${ feedResponse.getResponseContinuation()} " )
302
304
303
305
// If processing from the beginning (no continuation token passed into this method)
304
306
// it is safe to increase previousBlockStartContinuation here because we always at least return
@@ -319,7 +321,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
319
321
// hasNext can result in reading the next page of the changefeed and changing the continuation token header
320
322
while (feedResponse.getQueryIterator.hasNext)
321
323
{
322
- logDebug(" readChangeFeed.InWhile ContinuationToken: " + blockStartContinuation)
324
+ logDebug(s " readChangeFeed.InWhile ContinuationToken: ${ blockStartContinuation} " )
323
325
// fetchNextBlock can result in reading the next page of the changefeed and changing the continuation token header
324
326
val feedItems = feedResponse.getQueryIterable.fetchNextBlock()
325
327
@@ -354,7 +356,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
354
356
}
355
357
}
356
358
}
357
- logDebug(s " Receving " + cfDocuments.length.toString() + " change feed items ${if (cfDocuments.nonEmpty) cfDocuments(0)}" )
359
+ logDebug(s " Receving ${ cfDocuments.length.toString()} change feed items ${if (cfDocuments.nonEmpty) cfDocuments(0 )}" )
358
360
359
361
if (cfDocuments.length > 0 )
360
362
{
@@ -365,7 +367,7 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
365
367
{
366
368
nextContinuation = previousBlockStartContinuation + " |" + feedItems.last.get(" id" )
367
369
368
- logDebug(" readChangeFeed.MaxPageCountExceeded NextContinuation: " + nextContinuation)
370
+ logDebug(s " readChangeFeed.MaxPageCountExceeded NextContinuation: ${ nextContinuation} " )
369
371
break;
370
372
}
371
373
else
@@ -380,12 +382,12 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
380
382
previousBlockStartContinuation = blockStartContinuation
381
383
blockStartContinuation = nextContinuation
382
384
383
- logDebug(" readChangeFeed.EndInWhile NextContinuation: " + nextContinuation + " , blockStartContinuation: " + blockStartContinuation + " , previousBlockStartContinuation: " + previousBlockStartContinuation)
385
+ logDebug(s " readChangeFeed.EndInWhile NextContinuation: ${ nextContinuation} , blockStartContinuation: ${ blockStartContinuation} , previousBlockStartContinuation: ${ previousBlockStartContinuation} " )
384
386
}
385
387
}
386
388
}
387
389
388
- logDebug(" <-- readChangeFeed, Count: " + cfDocuments.length.toString() + " , NextContinuation: " + nextContinuation)
390
+ logDebug(s " <-- readChangeFeed, Count: ${ cfDocuments.length.toString()} , NextContinuation: ${ nextContinuation} " )
389
391
Tuple2 .apply(cfDocuments.iterator(), nextContinuation)
390
392
} else
391
393
{
0 commit comments