diff --git a/pom.xml b/pom.xml
index cfbfd9f8..0571f5db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@ limitations under the License.
com.microsoft.azure
azure-cosmosdb-spark_2.4.0_2.11
jar
- 1.4.0
+ 1.4.4-SNAPSHOT
${project.groupId}:${project.artifactId}
Spark Connector for Microsoft Azure CosmosDB
http://azure.microsoft.com/en-us/services/documentdb/
@@ -126,6 +126,11 @@ limitations under the License.
jackson-databind
2.9.8
+
+ com.fasterxml.jackson.module
+ jackson-module-scala_2.11
+ 2.9.8
+
commons-logging
commons-logging
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
index 8827cee5..644fd6cc 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala
@@ -23,6 +23,6 @@
package com.microsoft.azure.cosmosdb.spark
object Constants {
- val currentVersion = "2.4.0_2.11-1.3.5"
+ val currentVersion = "2.4.0_2.11-1.4.4"
val userAgentSuffix = s" SparkConnector/$currentVersion"
}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
index c2de3f53..9ad81e07 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala
@@ -23,7 +23,10 @@
package com.microsoft.azure.cosmosdb.spark
import java.lang.management.ManagementFactory
+import java.util.Collection
+
import com.microsoft.azure.cosmosdb.spark.config._
+import com.microsoft.azure.cosmosdb.spark.util.JacksonWrapper
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor
import com.microsoft.azure.documentdb.internal._
@@ -192,6 +195,22 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
feedResponse.getQueryIterable.iterator()
}
+ def readSchema(schemaType : String) = {
+ val partitionKeyDefinition = getCollection.getPartitionKey
+ val partitionKeyPath = partitionKeyDefinition.getPaths
+ val partitionKeyProperty = partitionKeyPath.iterator.next.replaceFirst("^/", "")
+
+ val feedOptions = new FeedOptions()
+ feedOptions.setEnableCrossPartitionQuery(true)
+ var schemaDocument : ItemSchema = null
+ val response = documentClient.queryDocuments(collectionLink, new SqlQuerySpec("Select * from c where c.schemaType = '" + schemaType + "' and c." + partitionKeyProperty + " = '__schema__" + schemaType + "'"), feedOptions);
+ val schemaResponse = response.getQueryIterable.fetchNextBlock()
+ if(schemaResponse != null && !schemaResponse.isEmpty) {
+ schemaDocument = JacksonWrapper.deserialize[ItemSchema](schemaResponse.get(0).toJson());
+ }
+ schemaDocument
+ }
+
def readDocuments(feedOptions: FeedOptions): Iterator[Document] = {
documentClient.readDocuments(collectionLink, feedOptions).getQueryIterable.iterator()
}
@@ -234,6 +253,14 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
documentClient.upsertDocument(collectionLink, document, requestOptions, false)
}
+ def insertDocument(collectionLink: String,
+ document: Document,
+ requestOptions: RequestOptions): Unit = {
+ logTrace(s"Inserting document $document")
+ documentClient.createDocument(collectionLink, document, requestOptions, false)
+ }
+
+
def isDocumentCollectionEmpty: Boolean = {
logDebug(s"Reading collection $collectionLink")
var requestOptions = new RequestOptions
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
index f4d6dce4..310ce610 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
@@ -25,9 +25,11 @@ package com.microsoft.azure.cosmosdb.spark
import java.util.UUID
import java.util.concurrent.TimeUnit
+import com.fasterxml.jackson.databind.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.cosmosdb.spark.rdd.{CosmosDBRDD, _}
import com.microsoft.azure.cosmosdb.spark.schema._
+import com.microsoft.azure.cosmosdb.spark.util.{HdfsUtils, JacksonWrapper}
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import rx.Observable
import com.microsoft.azure.documentdb._
@@ -38,6 +40,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
+import org.json4s.jackson.Json
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -45,6 +48,7 @@ import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import scala.util.Random
+import scala.util.parsing.json.JSONObject
/**
* The CosmosDBSpark allow fast creation of RDDs, DataFrames or Datasets from CosmosDBSpark.
@@ -190,6 +194,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
}
else
savePartition(iter, writeConfig, numPartitions, offerThroughput), preservesPartitioning = true)
+
mapRdd.collect()
// // All tasks have been completed, clean up the file checkpoints
@@ -265,7 +270,9 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
rootPropertyToSave: Option[String],
partitionKeyDefinition: Option[String],
upsert: Boolean,
- maxConcurrencyPerPartitionRange: Integer): Unit = {
+ maxConcurrencyPerPartitionRange: Integer,
+ config: Config,
+ executePreSave: (ItemSchema, String, Option[String], Document) => Unit): Unit = {
// Set retry options high for initialization (default values)
connection.setDefaultClientRetryPolicy
@@ -275,6 +282,16 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
// Set retry options to 0 to pass control to BulkExecutor
// connection.setZeroClientRetryPolicy
+ var schemaDocument : ItemSchema = null;
+ var schemaWriteRequired= false;
+ if(config.get[String](CosmosDBConfig.SchemaType).isDefined) {
+ schemaDocument = connection.readSchema(config.get[String](CosmosDBConfig.SchemaType).get);
+ if(schemaDocument == null){
+
+ // This means that we are writing data with a schema which is not defined yet
+ schemaWriteRequired = true
+ }
+ }
val documents = new java.util.ArrayList[String](writingBatchSize)
@@ -293,6 +310,103 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
if (document.getId == null) {
document.setId(UUID.randomUUID().toString)
}
+
+ if(schemaWriteRequired) {
+ // Create the schema document by reading columns from the first document
+ // This needs to be done only once
+
+
+ val schemaType = config.get[String](CosmosDBConfig.SchemaType).get
+ var schemaCols : ListBuffer[ItemColumn] = new ListBuffer[ItemColumn]();
+ val keys = document.getHashMap().keySet().toArray;
+
+ val partitionKeyDefinition = connection.getCollection.getPartitionKey
+ val partitionKeyPath = partitionKeyDefinition.getPaths
+ val partitionKeyProperty = partitionKeyPath.iterator.next.replaceFirst("^/", "")
+
+ var knownDefaults = List("", " ", 0)
+ var fixedDefaults = List("000000000000000000", "00000000000000000", "0000000000000000", "000000000000000", "00000000000000", "0000000000000","000000000000", "00000000000" ,"0000000000", "000000000" ,"00000000", "0000000", "000000","00000","0000","000","00","0")
+ knownDefaults = knownDefaults ::: fixedDefaults
+ if(config.get[String](CosmosDBConfig.KnownDefaultValues).isDefined) {
+ val customDefaults = config.get[String](CosmosDBConfig.KnownDefaultValues).get.split('|').toList
+ knownDefaults = knownDefaults ::: customDefaults
+ }
+
+ keys.foreach(
+ key => {
+ // Don't add system properties to the schema
+
+ var documentSchemaProperty = config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)
+
+ var systemProperties = List("_rid", "id", "_self", "_etag", "_attachments", "_ts");
+ systemProperties = documentSchemaProperty :: systemProperties
+
+ if(!systemProperties.contains(key)) {
+
+ var defaultVal : Object = null
+ var schemaType = "String"
+ val value = document.get(key.toString)
+ // defaultVal = value
+
+ if(knownDefaults.contains(value) || value == null) {
+ // Currently adding only known default values
+ defaultVal = value
+ }
+
+ if(value != null) {
+ val typeClass = value.getClass().toString.split('.').last;
+ schemaType = typeClass
+ }
+ schemaCols += new ItemColumn(key.toString, schemaType, defaultVal);
+ }
+ }
+ )
+ schemaDocument = new ItemSchema(schemaCols.toArray, schemaType);
+ val schemaDoc = new Document(JacksonWrapper.serialize(schemaDocument))
+
+ schemaDoc.set(partitionKeyProperty,"__schema__" + schemaType)
+ try {
+ logInfo("Writing schema")
+ connection.insertDocument(connection.collectionLink, schemaDoc, null);
+
+ logInfo("Successfully wrote schema" + schemaDoc)
+ }
+ catch {
+ // In case, the schema document already exists, then read the existing schema document
+
+ case ex : DocumentClientException => if (ex.getStatusCode == 409){
+ schemaDocument = null
+
+ val maxSchemaReadTime = 5000
+ var startTime = System.currentTimeMillis()
+ var elapsed : Long = 0
+
+ while(schemaDocument == null && elapsed < maxSchemaReadTime){
+ logInfo("Schema already present. Retrieving from collection.")
+ schemaDocument = connection.readSchema(config.get[String](CosmosDBConfig.SchemaType).get);
+ elapsed = System.currentTimeMillis() - startTime
+ }
+
+ if(schemaDocument == null){
+ throw new Exception("Unable to fetch schemaDocument after multiple attempts")
+ }
+
+ logInfo("Successfully retrieved schema from collection" + new Document(JacksonWrapper.serialize(schemaDocument)))
+ }
+ else {
+ throw new Exception("Unable to insert the schemaDocument", ex)
+ }
+
+ case ex : Throwable => throw ex
+ }
+
+ schemaWriteRequired = false
+ }
+
+ if(config.get[String](CosmosDBConfig.SchemaType).isDefined){
+ executePreSave(schemaDocument, config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn), config.get[String](CosmosDBConfig.IgnoreSchemaDefaults), document);
+ }
+
documents.add(document.toJson())
if (documents.size() >= writingBatchSize) {
bulkImportResponse = importer.importAll(documents, upsert, false, maxConcurrencyPerPartitionRange)
@@ -400,6 +514,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
iterator
}
+
private def savePartition[D: ClassTag](iter: Iterator[D],
config: Config,
partitionCount: Int,
@@ -408,6 +523,32 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
savePartition(connection, iter, config, partitionCount, offerThroughput)
}
+
+ private def executePreSave(schemaDocument : ItemSchema, documentSchemaProperty: String, ignoreDefaults : Option[String], item : Document): Unit =
+ {
+ // Add the schema property to the document
+ item.set(documentSchemaProperty, schemaDocument.schemaType)
+ var skipDefaults = false
+
+ if(ignoreDefaults.isDefined && ignoreDefaults.get.toBoolean){
+ skipDefaults = true
+ }
+
+ if(!skipDefaults) {
+ var docColumns = item.getHashMap().keySet().toArray();
+ var schemaColumns = schemaDocument.columns.map(col => (col.name, col.defaultValue));
+
+ //Remove columns from the document which have the same value as the defaultValue
+ schemaColumns.foreach(
+ col => if (docColumns.contains(col._1)) {
+ if (item.get(col._1) == col._2) {
+ item.remove(col._1)
+ }
+ }
+ )
+ }
+ }
+
private def savePartition[D: ClassTag](connection: CosmosDBConnection,
iter: Iterator[D],
config: Config,
@@ -445,6 +586,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
toInt
val partitionKeyDefinition = config
.get[String](CosmosDBConfig.PartitionKeyDefinition)
+ val documentSchemaProperty = config
+ .getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)
val maxConcurrencyPerPartitionRangeStr = config.get[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange)
val maxConcurrencyPerPartitionRange = if (maxConcurrencyPerPartitionRangeStr.nonEmpty)
@@ -465,7 +608,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
} else if (isBulkImporting) {
logDebug(s"Writing partition with bulk import")
bulkImport(iter, connection, offerThroughput, writingBatchSize, rootPropertyToSave,
- partitionKeyDefinition, upsert, maxConcurrencyPerPartitionRange)
+ partitionKeyDefinition, upsert, maxConcurrencyPerPartitionRange, config, executePreSave)
+
} else {
logDebug(s"Writing partition with rxjava")
asyncConnection.importWithRxJava(iter, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert)
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ItemSchema.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ItemSchema.scala
new file mode 100644
index 00000000..ee1cda0f
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ItemSchema.scala
@@ -0,0 +1,9 @@
+package com.microsoft.azure.cosmosdb.spark
+
+
+/**
+ * Class encapsulating the schema for a document type.
+ */
+case class ItemSchema (columns : Array[ItemColumn], schemaType : String)
+
+case class ItemColumn(name: String, dataType : String, defaultValue : Object )
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
index 61faacf3..ccd05905 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala
@@ -118,6 +118,11 @@ object CosmosDBConfig {
val ApplicationName = "application_name"
+ val SchemaType = "schemaType"
+ val KnownDefaultValues = "knownDefaultValues"
+ val SchemaPropertyColumn = "schemapropertycolumn"
+ val IgnoreSchemaDefaults = "ignoreschemadefaults"
+
// When the streaming source is slow, there will be times when getting data from a specific continuation token
// returns no results and therefore no information on the next continuation token set is available.
// In those cases, the connector gives a delay and then trigger the next batch.
@@ -169,6 +174,8 @@ object CosmosDBConfig {
val DefaultMaxConnectionPoolSize = 500
+ val DefaultSchemaPropertyColumn = "documentSchema"
+
def parseParameters(parameters: Map[String, String]): Map[String, Any] = {
return parameters.map { case (x, v) => x -> v }
}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
index 74e483eb..daacfaa3 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala
@@ -87,7 +87,10 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingT
partitions.toArray
} else {
// CosmosDB source
- var query: String = FilterConverter.createQueryString(requiredColumns, filters)
+ val schemaTypeName = config.get[String](CosmosDBConfig.SchemaType)
+ val documentSchemaProperty = config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)
+
+ var query: String = FilterConverter.createQueryString(requiredColumns, filters, schemaTypeName, documentSchemaProperty)
var partitionKeyRanges = connection.getAllPartitions(query)
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
Array.tabulate(partitionKeyRanges.length) {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
index 6b26d184..37cfef09 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
@@ -29,8 +29,9 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
import com.microsoft.azure.cosmosdb.spark.partitioner.CosmosDBPartition
import com.microsoft.azure.cosmosdb.spark.schema._
+import com.microsoft.azure.cosmosdb.spark.util.{HdfsUtils, JacksonWrapper}
+import com.microsoft.azure.cosmosdb.spark.{ CosmosDBConnection, CosmosDBLoggingTrait, ItemColumn, ItemSchema}
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
-import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, CosmosDBLoggingTrait}
import com.microsoft.azure.documentdb._
import org.apache.commons.lang3.StringUtils
import org.apache.spark._
@@ -45,6 +46,9 @@ object CosmosDBRDDIterator {
var hdfsUtils: HdfsUtils = _
+ var schemaCheckRequired = false
+
+
def initializeHdfsUtils(hadoopConfig: Map[String, String]): Any = {
if (hdfsUtils == null) {
this.synchronized {
@@ -138,6 +142,8 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
private var initialized = false
private var itemCount: Long = 0
+ private var schemaDocument : ItemSchema = _
+
lazy val reader: Iterator[Document] = {
initialized = true
var connection: CosmosDBConnection = new CosmosDBConnection(config)
@@ -191,12 +197,21 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
feedOpts.setPartitionKeyRangeIdInternal(partition.partitionKeyRangeId.toString)
CosmosDBRDDIterator.lastFeedOptions = feedOpts
+ val schemaTypeName = config.get[String](CosmosDBConfig.SchemaType)
+
+ val documentSchemaProperty = config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)
val queryString = config
.get[String](CosmosDBConfig.QueryCustom)
- .getOrElse(FilterConverter.createQueryString(requiredColumns, filters))
+ .getOrElse(FilterConverter.createQueryString(requiredColumns, filters, schemaTypeName, documentSchemaProperty))
logInfo(s"CosmosDBRDDIterator::LazyReader, created query string: $queryString")
+ if(schemaTypeName.isDefined) {
+ schemaDocument = connection.readSchema(config.get[String](CosmosDBConfig.SchemaType).get);
+ if(schemaDocument != null)
+ CosmosDBRDDIterator.schemaCheckRequired = true
+ }
+
if (queryString == FilterConverter.defaultQuery) {
// If there is no filters, read feed should be used
connection.readDocuments(feedOpts)
@@ -339,6 +354,29 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
}
}
+ private def executePostRead(item : Document): Unit =
+ {
+ if(schemaDocument != null) {
+
+ // Check if the document which is read has all the columns defined in the schema and add the default value if it is not defined
+ var newColumns = Map[String, ItemColumn]();
+ var docColumns = item.getHashMap().keySet().toArray();
+ var schemaColumns = schemaDocument.columns.map(col => (col.name, col));
+
+ schemaColumns.foreach(
+ col => if (!docColumns.contains(col._1)) {
+ newColumns += (col._1 -> col._2);
+ }
+ )
+
+ newColumns.foreach(
+ col => {
+ item.set(col._1, col._2.defaultValue)
+ }
+ );
+ }
+ }
+
// Register an on-task-completion callback to close the input stream.
taskContext.addTaskCompletionListener((context: TaskContext) => closeIfNeeded())
@@ -354,7 +392,11 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
throw new NoSuchElementException("End of stream")
}
itemCount = itemCount + 1
- reader.next()
+ var doc = reader.next()
+ if (CosmosDBRDDIterator.schemaCheckRequired) {
+ executePostRead(doc)
+ }
+ doc
}
def closeIfNeeded(): Unit = {
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/schema/FilterConverter.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/schema/FilterConverter.scala
index a863953a..e2626989 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/schema/FilterConverter.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/schema/FilterConverter.scala
@@ -23,6 +23,7 @@
package com.microsoft.azure.cosmosdb.spark.schema
import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait
+import com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfig
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.sources._
@@ -32,7 +33,7 @@ private [spark] object FilterConverter extends CosmosDBLoggingTrait {
def createQueryString(
requiredColumns: Array[String],
- filters: Array[Filter]): String = {
+ filters: Array[Filter], schemaTypeName: Option[String], documentSchemaProperty: String): String = {
var selectClause = "*"
//Note: for small document, the projection will transport less data but it might be slower because server
@@ -41,6 +42,13 @@ private [spark] object FilterConverter extends CosmosDBLoggingTrait {
var whereClause = StringUtils.EMPTY
if (filters.nonEmpty) whereClause = s"where ${createWhereClause(filters)}"
+ if (schemaTypeName.isDefined) {
+ val schemaFilter = s"c.${documentSchemaProperty} = '${schemaTypeName.get}'"
+ if (whereClause.nonEmpty)
+ whereClause = whereClause + s" AND ${schemaFilter}"
+ else
+ whereClause = s"where ${schemaFilter}"
+ }
String.format(queryTemplate, selectClause, whereClause)
}
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/JacksonWrapper.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/JacksonWrapper.scala
new file mode 100644
index 00000000..b370cb7c
--- /dev/null
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/JacksonWrapper.scala
@@ -0,0 +1,38 @@
+package com.microsoft.azure.cosmosdb.spark.util
+
+import java.lang.reflect.{ParameterizedType, Type}
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.fasterxml.jackson.core.`type`.TypeReference;
+
+object JacksonWrapper {
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+ mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+
+
+ def serialize(value: Any): String = {
+ import java.io.StringWriter
+ val writer = new StringWriter()
+ mapper.writeValue(writer, value)
+ writer.toString
+ }
+
+ def deserialize[T: Manifest](value: String) : T =
+ mapper.readValue(value, typeReference[T])
+
+ private [this] def typeReference[T: Manifest] = new TypeReference[T] {
+ override def getType = typeFromManifest(manifest[T])
+ }
+
+ private [this] def typeFromManifest(m: Manifest[_]): Type = {
+ if (m.typeArguments.isEmpty) { m.erasure }
+ else new ParameterizedType {
+ def getRawType = m.erasure
+ def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray
+ def getOwnerType = null
+ }
+ }
+}
\ No newline at end of file