Skip to content

Commit d286a44

Browse files
author
Bryan Marsh
committed
made change from PR 342 for scala 2.12 support and added logic for Spark 3.0.1 support
1 parent 7c56b44 commit d286a44

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ limitations under the License.
1919
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2020
<modelVersion>4.0.0</modelVersion>
2121
<groupId>com.microsoft.azure</groupId>
22-
<artifactId>azure-cosmosdb-spark_2.4.0_2.11</artifactId>
22+
<artifactId>azure-cosmosdb-spark_3.0.1_2.12</artifactId>
2323
<packaging>jar</packaging>
2424
<version>3.3.1</version>
2525
<name>${project.groupId}:${project.artifactId}</name>
@@ -32,11 +32,11 @@ limitations under the License.
3232
</license>
3333
</licenses>
3434
<properties>
35-
<scala.version>2.11.12</scala.version>
36-
<scala.binary.version>2.11</scala.binary.version>
35+
<scala.version>2.12.10</scala.version>
36+
<scala.binary.version>2.12</scala.binary.version>
3737
<sonar.projectBaseDir>azure-cosmosdb-spark</sonar.projectBaseDir>
3838
<scala.test.version>3.1.1</scala.test.version>
39-
<spark.version>2.4.4</spark.version>
39+
<spark.version>3.0.1</spark.version>
4040
<slf4j.version>1.7.30</slf4j.version>
4141
<log4j.version>1.2.17</log4j.version>
4242
<tinkerpop.version>3.2.5</tinkerpop.version>

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types.StructType
3333
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
34+
import org.apache.spark.util.TaskCompletionListener
3435
import org.apache.spark.{Partition, TaskContext}
3536

3637
import scala.collection.mutable
@@ -114,9 +115,11 @@ class CosmosDBRDD(
114115
case cosmosDBPartition: CosmosDBPartition =>
115116
logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
116117

117-
context.addTaskCompletionListener((ctx: TaskContext) => {
118-
logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
119-
})
118+
val completionListener: TaskCompletionListener = new TaskCompletionListener() {
119+
override def onTaskCompletion(context: TaskContext): Unit =
120+
logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
121+
}
122+
context.addTaskCompletionListener(completionListener)
120123

121124
new CosmosDBRDDIterator(
122125
hadoopConfig,

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes
4141
import org.apache.commons.lang3.StringUtils
4242
import org.apache.spark._
4343
import org.apache.spark.sql.sources.Filter
44+
import org.apache.spark.util.TaskCompletionListener
4445

4546
import scala.collection.mutable
4647
import org.joda.time.DateTimeZone
@@ -437,9 +438,13 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
437438
})
438439

439440
// Register an on-task-completion callback to close the input stream.
440-
taskContext.addTaskCompletionListener((_: TaskContext) => {
441-
closeIfNeeded()
442-
})
441+
val taskCompletionListerner = new TaskCompletionListener() {
442+
override def onTaskCompletion(context: TaskContext): Unit = {
443+
closeIfNeeded()
444+
}
445+
}
446+
447+
taskContext.addTaskCompletionListener(taskCompletionListerner)
443448

444449
if (!readingChangeFeed) {
445450
queryDocuments

0 commit comments

Comments
 (0)