From 4309f3238579ce1b9d1f81aab4b38d9b49fe3145 Mon Sep 17 00:00:00 2001 From: maasg Date: Fri, 15 Nov 2019 14:52:43 +0100 Subject: [PATCH 1/2] update the code to correctly handle SAM types --- .../microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala | 6 ++++-- .../azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala | 8 +++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index 63f0db55..de712337 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.util.TaskCompletionListener import org.apache.spark.{Partition, TaskContext} import scala.collection.mutable @@ -114,9 +115,10 @@ class CosmosDBRDD( case cosmosDBPartition: CosmosDBPartition => logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - context.addTaskCompletionListener((ctx: TaskContext) => { + val taskCompletionListener:TaskCompletionListener = (ctx: TaskContext) => { logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - }) + } + context.addTaskCompletionListener(taskCompletionListener) new CosmosDBRDDIterator( hadoopConfig, diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 2edb3ab3..56a24d78 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -41,6 +41,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes import org.apache.commons.lang3.StringUtils import org.apache.spark._ import org.apache.spark.sql.sources.Filter +import org.apache.spark.util.TaskCompletionListener import scala.collection.mutable @@ -424,9 +425,10 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) // Register an on-task-completion callback to close the input stream. - taskContext.addTaskCompletionListener((_: TaskContext) => { - closeIfNeeded() - }) + val taskCompletionListener: TaskCompletionListener = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = closeIfNeeded() + } + taskContext.addTaskCompletionListener(taskCompletionListener) if (!readingChangeFeed) { queryDocuments From 83f96befdbeee9279b87777bfaa1c352507d2b19 Mon Sep 17 00:00:00 2001 From: maasg Date: Mon, 6 Jul 2020 15:53:27 +0200 Subject: [PATCH 2/2] refresh branch from upstream --- pom.xml | 6 +++--- .../microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala | 7 ++++--- .../azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala | 9 ++++++--- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index c1bd7a9e..be8abd5d 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ limitations under the License. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.microsoft.azure - azure-cosmosdb-spark_2.4.0_2.11 + azure-cosmosdb-spark_2.4.0_2.12 jar 3.0.6 ${project.groupId}:${project.artifactId} @@ -32,8 +32,8 @@ limitations under the License. - 2.11.12 - 2.11 + 2.12.11 + 2.12 azure-cosmosdb-spark 3.1.1 2.4.4 diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index de712337..f58a033b 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -115,10 +115,11 @@ class CosmosDBRDD( case cosmosDBPartition: CosmosDBPartition => logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - val taskCompletionListener:TaskCompletionListener = (ctx: TaskContext) => { - logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") + val completionListener: TaskCompletionListener = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = + logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") } - context.addTaskCompletionListener(taskCompletionListener) + context.addTaskCompletionListener(completionListener) new CosmosDBRDDIterator( hadoopConfig, diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 56a24d78..bdd05e84 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -425,10 +425,13 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) // Register an on-task-completion callback to close the input stream. - val taskCompletionListener: TaskCompletionListener = new TaskCompletionListener() { - override def onTaskCompletion(context: TaskContext): Unit = closeIfNeeded() + val taskCompletionListerner = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = { + closeIfNeeded() + } } - taskContext.addTaskCompletionListener(taskCompletionListener) + + taskContext.addTaskCompletionListener(taskCompletionListerner) if (!readingChangeFeed) { queryDocuments