From af8dc16a4a507910bdf539f5b7a216dce4a808bc Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 27 Apr 2021 00:24:19 +0200 Subject: [PATCH 1/3] Adding more extensive logging in Spark connector --- pom.xml | 6 +- .../azure/cosmosdb/spark/IteratorLogger.java | 381 ++++++++++++++++++ .../cosmosdb/spark/BulkExecutorSettings.scala | 4 +- .../cosmosdb/spark/ClientConfiguration.scala | 34 +- .../azure/cosmosdb/spark/Constants.scala | 2 +- .../cosmosdb/spark/CosmosDBConnection.scala | 16 +- .../spark/CosmosDBConnectionCache.scala | 40 +- .../azure/cosmosdb/spark/CosmosDBSpark.scala | 77 +++- .../azure/cosmosdb/spark/DefaultSource.scala | 3 +- .../azure/cosmosdb/spark/HdfsLogWriter.scala | 93 +++++ .../azure/cosmosdb/spark/LogginRDD.scala | 28 ++ .../cosmosdb/spark/LoggingIterator.scala | 103 +++++ .../spark/config/CosmosDBConfig.scala | 9 +- .../partitioner/CosmosDBPartitioner.scala | 7 +- .../cosmosdb/spark/rdd/CosmosDBRDD.scala | 7 +- .../spark/rdd/CosmosDBRDDIterator.scala | 15 +- .../azure/cosmosdb/spark/util/HdfsUtils.scala | 14 +- 17 files changed, 802 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java create mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala create mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala create mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala diff --git a/pom.xml b/pom.xml index 38626972..d3ec76f6 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ limitations under the License. com.microsoft.azure azure-cosmosdb-spark_2.4.0_2.11 jar - 3.6.9 + 3.6.14-SNAPSHOT ${project.groupId}:${project.artifactId} Spark Connector for Microsoft Azure CosmosDB http://azure.microsoft.com/en-us/services/documentdb/ @@ -51,7 +51,7 @@ limitations under the License. com.microsoft.azure azure-documentdb - 2.6.0 + 2.7.2-SNAPSHOT org.scala-lang @@ -77,7 +77,7 @@ limitations under the License. com.microsoft.azure documentdb-bulkexecutor - 2.12.0 + 2.13.2-SNAPSHOT com.microsoft.azure diff --git a/src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java b/src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java new file mode 100644 index 00000000..f2c5fa65 --- /dev/null +++ b/src/main/java/com/microsoft/azure/cosmosdb/spark/IteratorLogger.java @@ -0,0 +1,381 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2017 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.microsoft.azure.cosmosdb.spark; + +import com.microsoft.azure.documentdb.*; +import com.microsoft.azure.documentdb.internal.routing.PartitionKeyInternal; +import org.joda.time.Instant; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.io.StringWriter; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class IteratorLogger implements Serializable { + private final static String headerLine = "Timestamp|Level|Event|PKRangeId|UserAgentSuffix|Message|Exception|" + + "IteratedDocuments|Count"; + + private final static String LOG_LEVEL_INFO = "I"; + private final static String LOG_LEVEL_ERROR = "E"; + + private final static String EVENT_NAME_LOG = "Log"; + private final static String EVENT_NAME_ITERATOR_NEXT = "ConsumedFromIterator"; + private final CosmosLogWriter writer; + private final String userAgentSuffix; + private final String pkRangeId; + private final String iteratorName; + private final StringBuilder iteratedDocuments = new StringBuilder().append("["); + private final AtomicInteger iteratedDocumentCount = new AtomicInteger(0); + + public IteratorLogger( + CosmosLogWriter writer, + String userAgentSuffix, + String pkRangeId, + String iteratorName) { + this.userAgentSuffix = userAgentSuffix; + this.pkRangeId = pkRangeId; + this.iteratorName = iteratorName; + this.writer = writer; + + if (writer != null) { + this.writer.writeLine(headerLine); + } + } + + public void logError(String message, Throwable throwable) { + if (writer == null) { + return; + } + + logLogEvent(LOG_LEVEL_ERROR, message, throwable); + } + + public void onIteratorNext(T document, PartitionKeyDefinition pkDefinition) { + if (writer == null) { + return; + } + + String contentToFlush = null; + int countSnapshot; + + synchronized (this.iteratedDocuments) { + if (this.iteratedDocuments.length() > 1) { + this.iteratedDocuments.append(", "); + } + this.iteratedDocuments.append( + formatDocumentIdentity( + DocumentAnalyzer.extractDocumentIdentity( + document, + pkDefinition))); + countSnapshot = this.iteratedDocumentCount.incrementAndGet(); + + if (this.iteratedDocuments.length() > 1024) { + this.iteratedDocuments.append("]"); + contentToFlush = this.iteratedDocuments.toString(); + this.iteratedDocuments.setLength(0); + this.iteratedDocuments.append("["); + this.iteratedDocumentCount.set(0); + } + } + + if (contentToFlush != null) { + this.logLine( + LOG_LEVEL_INFO, + EVENT_NAME_ITERATOR_NEXT, + this.iteratorName, + null, + contentToFlush, + countSnapshot); + } + } + + public void flush() { + if (this.writer == null) { + return; + } + + synchronized (this.iteratedDocuments) { + if (this.iteratedDocuments.length() > 1) { + this.logLine( + LOG_LEVEL_INFO, + EVENT_NAME_ITERATOR_NEXT, + this.iteratorName, + null, + this.iteratedDocuments.toString() + "]", + this.iteratedDocumentCount.get()); + } + } + } + + private String extractDocumentIdentities(List resources, PartitionKeyDefinition pkDefinition) { + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < resources.size(); i++) { + if (i > 0) { + sb.append(", "); + } + + String[] identity = DocumentAnalyzer.extractDocumentIdentity( + resources.get(i), + pkDefinition); + sb.append(formatDocumentIdentity(identity)); + } + sb.append("]"); + + return sb.toString(); + } + + private String formatDocumentIdentity(String[] identity) { + return "(" + identity[0] + "/" + identity[1] + ")"; + } + + private void logLogEvent( + String logLevel, + String message, + Throwable exception) { + + logLine( + logLevel, + EVENT_NAME_LOG, + message, + exception, + null, + null); + } + + // "Timestamp|Level|Event|PKRangeId|UserAgentSuffix|Message|Exception|" + + // "IteratedDocuments|Count"; + private void logLine( + String logLevel, + String eventName, + String message, + Throwable exception, + String iteratedDocuments, + Integer count) { + + writer.writeLine( + join( + Instant.now().toString(), + logLevel, + eventName, + pkRangeId, + this.userAgentSuffix, + message, + throwableToString(exception), + iteratedDocuments, + count != null ? count.toString() : "") + ); + } + + private String join(String... args) { + StringBuilder sb = new StringBuilder(); + String separator = "|"; + for (String c : args) { + if (sb.length() > 0) { + sb.append(separator); + } + + sb.append(c); + } + + return sb.toString(); + } + + private String throwableToString(Throwable throwable) { + String exceptionText = null; + if (throwable == null) { + return null; + } + + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + throwable.printStackTrace(pw); + return sw.toString(); + } + + private static class DocumentAnalyzer { + private final static Logger LOGGER = LoggerFactory.getLogger(DocumentAnalyzer.class); + + /** + * Extracts effective {@link PartitionKeyInternal} from serialized document. + * @param partitionKeyDefinition Information about partition key. + * @return PartitionKeyInternal + */ + public static String[] extractDocumentIdentity( + Resource root, + PartitionKeyDefinition partitionKeyDefinition) { + + String pk = "n/a"; + if (partitionKeyDefinition != null && partitionKeyDefinition.getPaths().size() > 0) { + pk = DocumentAnalyzer + .extractPartitionKeyValueInternal( + root, + partitionKeyDefinition).toJson(); + } + + String id = "n/a"; + if (root.getId() != null){ + id = root.getId(); + } + + return new String[] { pk, id }; + } + + private static PartitionKeyInternal extractPartitionKeyValueInternal( + Resource resource, + PartitionKeyDefinition partitionKeyDefinition) { + if (partitionKeyDefinition != null) { + String path = partitionKeyDefinition.getPaths().iterator().next(); + Collection parts = com.microsoft.azure.documentdb.internal.PathParser.getPathParts(path); + if (parts.size() >= 1) { + Object value = resource.getObjectByPath(parts); + if (value == null || value.getClass() == JSONObject.class) { + value = Undefined.Value(); + } + + return PartitionKeyInternal.fromObjectArray(Arrays.asList(value), false); + } + } + + return null; + } + + public static PartitionKeyInternal fromPartitionKeyvalue(Object partitionKeyValue) { + try { + return PartitionKeyInternal.fromObjectArray(Collections.singletonList(partitionKeyValue), true); + } catch (Exception e) { + LOGGER.error("Failed to instantiate ParitionKeyInternal from {}", partitionKeyValue, e); + throw toRuntimeException(e); + } + } + + public static RuntimeException toRuntimeException(Exception e) { + if (e instanceof RuntimeException) { + return (RuntimeException) e; + } else { + return new RuntimeException(e); + } + } + } + + final static class PathParser + { + private final static char SEGMENT_SEPARATOR = '/'; + private final static String ERROR_MESSAGE_FORMAT = "Invalid path \"%s\", failed at %d"; + + /** + * Extract parts from a given path for '/' as the separator. + *

+ * This code doesn't do as much validation as the backend, as it assumes that IndexingPolicy path coming from the backend is valid. + * + * @param path specifies a partition key given as a path. + * @return a list of all the parts for '/' as the separator. + */ + public static List getPathParts(String path) + { + List tokens = new ArrayList(); + AtomicInteger currentIndex = new AtomicInteger(); + + while (currentIndex.get() < path.length()) + { + char currentChar = path.charAt(currentIndex.get()); + if (currentChar != SEGMENT_SEPARATOR) + { + throw new IllegalArgumentException( + String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get())); + } + + if (currentIndex.incrementAndGet() == path.length()) + { + break; + } + + currentChar = path.charAt(currentIndex.get()); + if (currentChar == '\"' || currentChar == '\'') + { + // Handles the partial path given in quotes such as "'abc/def'" + tokens.add(getEscapedToken(path, currentIndex)); + } + else + { + tokens.add(getToken(path, currentIndex)); + } + } + + return tokens; + } + + private static String getEscapedToken(String path, AtomicInteger currentIndex) + { + char quote = path.charAt(currentIndex.get()); + int newIndex = currentIndex.incrementAndGet(); + + while (true) + { + newIndex = path.indexOf(quote, newIndex); + if (newIndex == -1) + { + throw new IllegalArgumentException( + String.format(ERROR_MESSAGE_FORMAT, path, currentIndex.get())); + } + + // Ignore escaped quote in the partial path we look at such as "'abc/def \'12\'/ghi'" + if (path.charAt(newIndex - 1) != '\\') + { + break; + } + + ++newIndex; + } + + String token = path.substring(currentIndex.get(), newIndex); + currentIndex.set(newIndex + 1); + + return token; + } + + private static String getToken(String path, AtomicInteger currentIndex) + { + int newIndex = path.indexOf(SEGMENT_SEPARATOR, currentIndex.get()); + String token = null; + if (newIndex == -1) + { + token = path.substring(currentIndex.get()); + currentIndex.set(path.length()); + } + else + { + token = path.substring(currentIndex.get(), newIndex); + currentIndex.set(newIndex); + } + + token = token.trim(); + return token; + } + } +} diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala index 9d2e2cb1..307a23e3 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala @@ -32,4 +32,6 @@ package com.microsoft.azure.cosmosdb.spark */ private[spark] case class BulkExecutorSettings( maxMiniBatchUpdateCount: Int, - partitionKeyOption: Option[String]) \ No newline at end of file + partitionKeyOption: Option[String], + bulkLoggingPath: Option[String], + bulkLoggingCorrelationId: Option[String]) \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala index 02e0cd71..8afeeb11 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala @@ -25,10 +25,10 @@ package com.microsoft.azure.cosmosdb.spark import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.internal._ - import java.lang.management.ManagementFactory import scala.collection.JavaConversions._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.language.implicitConversions @@ -50,7 +50,10 @@ private[spark] case class ClientConfiguration( consistencyLevel: String, database: String, container: String, - bulkConfig: BulkExecutorSettings) { + bulkConfig: BulkExecutorSettings, + queryLoggingPath: Option[String], + queryLoggingCorrelationId: Option[String], + hadoopConfig: mutable.Map[String, String]) { def getCollectionLink(): String = { ClientConfiguration.getCollectionLink(database, container) @@ -59,14 +62,26 @@ private[spark] case class ClientConfiguration( def getDatabaseLink() : String = { ClientConfiguration.getDatabaseLink(database) } + + def getQueryLoggingPath(): Option[String] = { + queryLoggingPath match { + case Some(basePath) => queryLoggingCorrelationId match { + case Some(correlationId) => Some(basePath + queryLoggingCorrelationId.get + "/") + case None => Some(basePath) + } + case None => None + } + } } object ClientConfiguration extends CosmosDBLoggingTrait { - def apply(config: Config): ClientConfiguration = { + def apply(config: Config, hadoopConfig: mutable.Map[String, String]): ClientConfiguration = { val database : String = config.get(CosmosDBConfig.Database).get val collection : String = config.get(CosmosDBConfig.Collection).get val authConfig : AuthConfig = validateAndCreateAuthConfig(config, database, collection) val connectionPolicySettings : ConnectionPolicySettings = createConnectionPolicySettings(config) + val queryLoggingPath = config.get(CosmosDBConfig.QueryLoggingPath) + val queryLoggingCorrelationId = config.get(CosmosDBConfig.QueryLoggingCorrelationId) val bulkExecutorSettings : BulkExecutorSettings = createBulkExecutorSettings(config) // Get consistency level @@ -81,7 +96,11 @@ object ClientConfiguration extends CosmosDBLoggingTrait { consistencyLevel, database, collection, - bulkExecutorSettings) + bulkExecutorSettings, + queryLoggingPath, + queryLoggingCorrelationId, + hadoopConfig + ) } private def validateAndCreateAuthConfig(config: Config, database: String, collection: String) : AuthConfig = { @@ -110,9 +129,14 @@ object ClientConfiguration extends CosmosDBLoggingTrait { val maxMiniBatchUpdateCount: Int = config .getOrElse(CosmosDBConfig.MaxMiniBatchUpdateCount, CosmosDBConfig.DefaultMaxMiniBatchUpdateCount) + val bulkLoggingPath = config.get(CosmosDBConfig.BulkLoggingPath) + val bulkLoggingCorrelationId = config.get(CosmosDBConfig.BulkLoggingCorrelationId) + BulkExecutorSettings( maxMiniBatchUpdateCount, - pkDef) + pkDef, + bulkLoggingPath, + bulkLoggingCorrelationId) } private def createConnectionPolicySettings(config: Config) : ConnectionPolicySettings = { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala index ca0e9ffe..51e4b076 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala @@ -23,6 +23,6 @@ package com.microsoft.azure.cosmosdb.spark object Constants { - val currentVersion = "2.4.0_2.11-3.6.9" + val currentVersion = "2.4.0_2.11-3.6.14-SNAPSHOT" val userAgentSuffix = s" SparkConnector/$currentVersion" } \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala index bbd4243e..1911c932 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnection.scala @@ -23,6 +23,7 @@ package com.microsoft.azure.cosmosdb.spark import java.net.SocketTimeoutException +import java.util.concurrent.Callable import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.documentdb._ @@ -34,6 +35,9 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.control.Breaks._ import com.microsoft.azure.cosmosdb.rx.internal.NotFoundException +import org.apache.hadoop.conf.Configuration + +import scala.collection.mutable private object CosmosDBConnection { private val rnd = scala.util.Random @@ -43,10 +47,10 @@ private object CosmosDBConnection { } } -private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLoggingTrait with Serializable { +private[spark] case class CosmosDBConnection(config: Config, hadoopConfig: mutable.Map[String, String]) extends CosmosDBLoggingTrait with Serializable { private val maxPagesPerBatch = config.getOrElse[String](CosmosDBConfig.ChangeFeedMaxPagesPerBatch, CosmosDBConfig.DefaultChangeFeedMaxPagesPerBatch.toString).toInt - private val clientConfig = ClientConfiguration(config) + val clientConfig = ClientConfiguration(config, hadoopConfig) def getCollectionLink: String = { executeWithRetryOnCollectionRecreate( @@ -58,6 +62,11 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog CosmosDBConnectionCache.reinitializeClient(clientConfig) } + def flushLogWriter = { + val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig) + documentClient.flushLogWriter() + } + private def getAllPartitionsInternal: List[PartitionKeyRange] = { val documentClient = CosmosDBConnectionCache.getOrCreateClient(clientConfig) val ranges = documentClient.readPartitionKeyRanges(getCollectionLink, null.asInstanceOf[FeedOptions]) @@ -232,7 +241,8 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog logDebug(s"CosmosDBConnection.getIteratorFromFeedResponse -- With continuation - returning query iterator") val responseIterator:Iterator[T] = response .getQueryIterator - responseIterator + + responseIterator } } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala index 98501a23..9e4b17e2 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala @@ -23,14 +23,19 @@ package com.microsoft.azure.cosmosdb.spark import java.util.concurrent.ConcurrentHashMap -import java.util.{Timer, TimerTask} +import java.util.{Timer, TimerTask, UUID} import com.microsoft.azure.cosmosdb.spark.config.CosmosDBConfig +import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor import com.microsoft.azure.documentdb.internal.routing.PartitionKeyRangeCache +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.SparkSession +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration import scala.collection.JavaConversions._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.language.implicitConversions @@ -68,7 +73,9 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { private val rnd = scala.util.Random private val refreshDelay : Long = (10 * 60 * 1000) + rnd.nextInt(5 * 60 * 1000) // in 10 - 15 minutes + //private val refreshDelay : Long = (1 * 60 * 1000) + rnd.nextInt(1 * 60 * 1000) // in 10 - 15 minutes private val refreshPeriod : Long = 15 * 60 * 1000 // every 15 minutes + //private val refreshPeriod : Long = 2 * 60 * 1000 // every 15 minutes // main purpose of the time is to allow bulk operations to consume // additional throughput when more RUs are getting provisioned private val timerName = "throughput-refresh-timer" @@ -110,7 +117,7 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { val effectivelyAvailableThroughputForBulkOperations = getOrReadMaxAvailableThroughput(config) - val builder = DocumentBulkExecutor.builder + var builder = DocumentBulkExecutor.builder .from( client, config.database, @@ -121,6 +128,15 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { .withInitializationRetryOptions(bulkExecutorInitializationRetryOptions) .withMaxUpdateMiniBatchCount(config.bulkConfig.maxMiniBatchUpdateCount) + if (config.bulkConfig.bulkLoggingPath.isDefined) { + val logWriter = new HdfsLogWriter( + config.bulkConfig.bulkLoggingCorrelationId.getOrElse(UUID.randomUUID().toString), + config.hadoopConfig.toMap, + config.bulkConfig.bulkLoggingPath.get) + + builder = builder.withLogWriter(logWriter) + } + // Instantiate DocumentBulkExecutor val bulkExecutor = builder.build() @@ -246,6 +262,11 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { maxAvailableThroughput = None ) + oldClientCacheEntry.bulkExecutor match { + case Some(bulkExecutor) => bulkExecutor.flushLogs() + case None => + } + logInfo(s"$timerName: ClientConfiguration#${config.hashCode} has been reset - new " + s"${newClientCacheEntry.getLogMessage}, previously ${oldClientCacheEntry.getLogMessage}") @@ -418,12 +439,25 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { val consistencyLevel = ConsistencyLevel.valueOf(config.consistencyLevel) lastConsistencyLevel = Some(consistencyLevel) - new DocumentClient( + val client = new DocumentClient( config.host, config.authConfig.authKey, lastConnectionPolicy, consistencyLevel ) + + config.getQueryLoggingPath() match { + case Some(path) => { + val logger = new HdfsLogWriter( + config.queryLoggingCorrelationId.getOrElse(""), + config.hadoopConfig.toMap, + path) + + client.setLogWriter(logger); + } + case None => client + } + } private def createConnectionPolicy(settings: ConnectionPolicySettings): ConnectionPolicy = { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala index 30127fbb..ae78d707 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala @@ -24,18 +24,17 @@ package com.microsoft.azure.cosmosdb.spark import java.io.PrintWriter import java.io.StringWriter +import java.lang.management.ManagementFactory import java.nio.charset.Charset import java.util.UUID -import java.util.concurrent.TimeUnit import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.cosmosdb.spark.rdd.{CosmosDBRDD, _} import com.microsoft.azure.cosmosdb.spark.schema._ import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils -import rx.Observable import com.microsoft.azure.documentdb._ import com.microsoft.azure.documentdb.bulkexecutor.{BulkImportResponse, BulkUpdateResponse, DocumentBulkExecutor, UpdateItem} -import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.SparkContext import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -144,6 +143,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { */ def save[D: ClassTag](rdd: RDD[D], writeConfig: Config): Unit = { var numPartitions = 0 + val hadoopConfig = HdfsUtils.getConfigurationMap(rdd.sparkContext.hadoopConfiguration) try { numPartitions = rdd.getNumPartitions } catch { @@ -178,7 +178,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { // In this case, users can set maxIngestionTaskParallelism to 32 and will help with the RU consumption based on writeThroughputBudget. if (maxIngestionTaskParallelism.exists(_ > 0)) numPartitions = maxIngestionTaskParallelism.get - val cosmosPartitionsCount = CosmosDBConnection(writeConfig).getAllPartitions.length + val cosmosPartitionsCount = CosmosDBConnection(writeConfig, hadoopConfig).getAllPartitions.length // writeThroughputBudget per cosmos db physical partition writeThroughputBudgetPerCosmosPartition = Some((writeThroughputBudget.get / cosmosPartitionsCount).ceil.toInt) val baseMiniBatchSizeAdjustmentFactor: Double = (baseMiniBatchRUConsumption.toDouble * numPartitions) / writeThroughputBudgetPerCosmosPartition.get @@ -192,9 +192,66 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { } } - val mapRdd = rdd.coalesce(numPartitions).mapPartitions(savePartition(_, writeConfig, numPartitions, - baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition), preservesPartitioning = true) + val connection: CosmosDBConnection = CosmosDBConnection(writeConfig, hadoopConfig) + val cosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config)) + val iteratorLoggingPath = writeConfig.get[String](CosmosDBConfig.IteratorLoggingPath) + val iteratorLoggingCorrelationId = writeConfig.get[String](CosmosDBConfig.IteratorLoggingCorrelationId) + val rootPropertyToSave = writeConfig.get[String](CosmosDBConfig.RootPropertyToSave) + val applicationName: String = writeConfig.getOrElse[String](CosmosDBConfig.ApplicationName, "") + val pkDefinition = connection.getPartitionKeyDefinition + val userAgentString: String = if (applicationName.isEmpty) { + s"${Constants.userAgentSuffix} ${ManagementFactory.getRuntimeMXBean.getName}" + } else { + s"${Constants.userAgentSuffix} ${ManagementFactory.getRuntimeMXBean.getName} $applicationName" + } + var writer: Option[HdfsLogWriter] = None + var rddLogger: Option[IteratorLogger] = None + if (iteratorLoggingPath.isDefined) { + writer = Some(new HdfsLogWriter( + iteratorLoggingCorrelationId.getOrElse(""), + hadoopConfig.toMap, + iteratorLoggingPath.get)) + + rddLogger = Some(new IteratorLogger(writer.get, userAgentString, "n/a", "OriginalRDD")) + } + + val effectiveRdd = new LoggingRDD[D](rdd, rddLogger, pkDefinition, rootPropertyToSave, cosmosDBRowConverter) + + val mapRdd = effectiveRdd.coalesce(numPartitions).mapPartitions(partitionedIterator => { + val partitionedCosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config)) + val partitionedWriter = Some(new HdfsLogWriter( + iteratorLoggingCorrelationId.getOrElse(""), + hadoopConfig.toMap, + iteratorLoggingPath.get)) + var iterationLogger : Option[IteratorLogger] = None + if (partitionedWriter.isDefined) { + iterationLogger = Some(new IteratorLogger(partitionedWriter.get, userAgentString, "n/a", "partitionedIterator")) + } + + val effectiveIterator = LoggingIterator.createLoggingAndConvertingIterator( + partitionedIterator, + iterationLogger, + pkDefinition, + rootPropertyToSave, + partitionedCosmosDBRowConverter + ) + + val returnValue = savePartition(effectiveIterator, writeConfig, hadoopConfig, numPartitions, + baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition) + + if (partitionedWriter.isDefined) { + iterationLogger.get.flush() + partitionedWriter.get.flush() + } + + returnValue + }, true) mapRdd.collect() + + if (writer.isDefined) { + rddLogger.get.flush() + writer.get.flush() + } } private def bulkUpdate[D: ClassTag](iter: Iterator[D], @@ -381,6 +438,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { throw toFailedImportException(bulkImportResponse, connection) } } + + importer.flushLogs() } private def toFailedImportException(response: BulkImportResponse, connection: CosmosDBConnection) : Exception = { @@ -441,10 +500,11 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { private def savePartition[D: ClassTag](iter: Iterator[D], config: Config, + hadoopConfig: mutable.Map[String, String], partitionCount: Int, baseMaxMiniBatchImportSize: Int, writeThroughputBudgetPerCosmosPartition: Option[Int]): Iterator[D] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig) val asyncConnection: AsyncCosmosDBConnection = new AsyncCosmosDBConnection(config) val isBulkImporting = config.get[String](CosmosDBConfig.BulkImport). @@ -745,7 +805,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { /** * The CosmosDBSpark class * - * '''Note:''' Creation of the class should be via [[CosmosDBSpark$.Builder]]. + * '''Note:''' Creation of the class should be via [[CosmosDBSpark Builder]]. * * @since 1.0 */ @@ -757,7 +817,6 @@ case class CosmosDBSpark(sparkSession: SparkSession, readConfig: Config) { /** * Creates a `RDD` for the collection * - * @tparam D the datatype for the collection * @return a CosmosDBRDD[D] */ def toRDD: CosmosDBRDD = rdd diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala index b86b5c31..921d2341 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/DefaultSource.scala @@ -24,6 +24,7 @@ package com.microsoft.azure.cosmosdb.spark import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRelation +import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType @@ -62,7 +63,7 @@ class DefaultSource extends RelationProvider data: DataFrame): BaseRelation = { val config: Config = Config(sqlContext.sparkContext.getConf, parameters) - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, HdfsUtils.getConfigurationMap(sqlContext.sparkSession.sparkContext.hadoopConfiguration)) val isEmptyCollection: Boolean = connection.isDocumentCollectionEmpty mode match{ case Append => diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala new file mode 100644 index 00000000..3bda1d8e --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala @@ -0,0 +1,93 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2016 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.spark + +import java.io.{BufferedOutputStream, Closeable} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils +import com.microsoft.azure.documentdb.CosmosLogWriter +import org.apache.spark.SparkEnv + +import scala.util.Properties + +private object HdfsLogWriter { + val targetedMemoryBufferSizeInBytes = 50000000 + + val lineSeparator = Properties.lineSeparator +} + +private case class HdfsLogWriter +( + correlationId: String, + configMap: Map[String, String], + loggingLocation: String +) extends CosmosLogWriter with Closeable with CosmosDBLoggingTrait { + + private[this] val inMemoryLock = "" + private[this] val executorId: String = SparkEnv.get.executorId + private[this] val fileId = new AtomicInteger(0) + private[this] val sb: StringBuilder = new StringBuilder() + private[this] lazy val hdfsUtils = new HdfsUtils(configMap, loggingLocation) + + logInfo("HdfsBulkLogWriter instantiated.") + + override def writeLine(line: String): Unit = { + if (line != null) { + val prettyLine = line.filter(_ >= ' ') + HdfsLogWriter.lineSeparator + logDebug(s"PrettyLine: $prettyLine") + this.inMemoryLock.synchronized { + if (sb.length + prettyLine.length >= HdfsLogWriter.targetedMemoryBufferSizeInBytes) { + this.flush() + } + + this.sb.append(prettyLine) + } + } + } + + override def flush(): Unit = { + logInfo(s"Flush: ${sb.size}") + var contentToFlush: Option[String] = None + this.inMemoryLock.synchronized { + if (this.sb.size > 0) { + contentToFlush = Some(this.sb.toString()) + this.sb.clear() + } + } + + contentToFlush match { + case Some(content) => { + val fileName = s"${correlationId}_${executorId}_${this.fileId.incrementAndGet()}.log" + logInfo(s"WriteLogFile: ${fileName} - ${content.length} bytes") + hdfsUtils.writeLogFile(this.loggingLocation, fileName, content) + } + case None => + } + } + + override def close(): Unit = { + logInfo("Close") + this.flush + } +} diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala new file mode 100644 index 00000000..bb598890 --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala @@ -0,0 +1,28 @@ +package com.microsoft.azure.cosmosdb.spark + +import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter +import com.microsoft.azure.documentdb.{Document, PartitionKeyDefinition} +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.rdd.RDD +import scala.reflect.ClassTag + +private[spark] class LoggingRDD[D: ClassTag] +( + innerRDD: RDD[D], + logger: Option[IteratorLogger], + partitionKey: PartitionKeyDefinition, + rootPropertyToSave: Option[String], + cosmosDBRowConverter: CosmosDBRowConverter +) extends RDD[Document](innerRDD) { + override def compute(split: Partition, context: TaskContext): Iterator[Document] = { + LoggingIterator.createLoggingAndConvertingIterator( + firstParent[D].iterator(split, context), + logger, + partitionKey, + rootPropertyToSave, + cosmosDBRowConverter + ) + } + + override protected def getPartitions: Array[Partition] = firstParent[D].partitions +} diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala new file mode 100644 index 00000000..4e653c0a --- /dev/null +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala @@ -0,0 +1,103 @@ +/** + * The MIT License (MIT) + * Copyright (c) 2016 Microsoft Corporation + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.microsoft.azure.cosmosdb.spark + +import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter +import com.microsoft.azure.documentdb.{Document, PartitionKeyDefinition} +import org.apache.spark.sql.Row + +import scala.reflect.ClassTag + +private[spark] object LoggingIterator { + def createLoggingIterator[D: ClassTag] + ( + inner: Iterator[D], + logger: IteratorLogger, + partitionKeyDefinition: PartitionKeyDefinition, + rootPropertyToSave: Option[String], + cosmosDBRowConverter: CosmosDBRowConverter): Iterator[D] = { + + inner.map(input => { + + try { + val document: Document = input match { + case doc: Document => doc + case row: Row => + if (rootPropertyToSave.isDefined) { + new Document(row.getString(row.fieldIndex(rootPropertyToSave.get))) + } else { + new Document(cosmosDBRowConverter.rowToJSONObject(row).toString()) + } + case any => new Document(any.toString) + } + + logger.onIteratorNext(document, partitionKeyDefinition) + input + } catch { + case t: Throwable => { + logger.logError("Failure converting RDD item", t) + throw t + } + } + }) + } + + def createLoggingAndConvertingIterator[D: ClassTag] + ( + inner: Iterator[D], + logger: Option[IteratorLogger], + partitionKeyDefinition: PartitionKeyDefinition, + rootPropertyToSave: Option[String], + cosmosDBRowConverter: CosmosDBRowConverter): Iterator[Document] = { + + inner.map(input => { + + try { + val document: Document = input match { + case doc: Document => doc + case row: Row => + if (rootPropertyToSave.isDefined) { + new Document(row.getString(row.fieldIndex(rootPropertyToSave.get))) + } else { + new Document(cosmosDBRowConverter.rowToJSONObject(row).toString()) + } + case any => new Document(any.toString) + } + + if (logger.isDefined) { + logger.get.onIteratorNext(document, partitionKeyDefinition) + } + document + } catch { + case t: Throwable => { + if (logger.isDefined) { + logger.get.logError("Failure converting RDD item", t) + } + throw t + } + } + }) + } +} + + diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala index 8891815f..7935c043 100755 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala @@ -95,7 +95,14 @@ object CosmosDBConfig { val DefaultMaxTransientRetryDelayInMs = 100 // 0.1 second val DefaultPoisonMessageLocation = "" val DefaultTreatUnknownExceptionsAsTransient = true - + + val BulkLoggingPath = "bulkLoggingPath" + val BulkLoggingCorrelationId = "bulkLoggingCorrelationId" + val QueryLoggingPath = "queryLoggingPath" + val QueryLoggingCorrelationId = "queryLoggingCorrelationId" + val IteratorLoggingPath = "iteratorLoggingPath" + val IteratorLoggingCorrelationId = "iteratorLoggingCorrelationId" + // Not a config, constant val StreamingTimestampToken = "tsToken" diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala index dad76eff..bb5193a7 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/partitioner/CosmosDBPartitioner.scala @@ -26,19 +26,20 @@ import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.cosmosdb.spark.schema.FilterConverter import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnection, CosmosDBLoggingTrait} +import org.apache.hadoop.conf.Configuration import org.apache.spark.Partition import org.apache.spark.sql.sources.Filter import scala.collection.mutable import scala.collection.mutable.ListBuffer -class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingTrait { +class CosmosDBPartitioner(hadoopConfig: mutable.Map[String, String]) extends Partitioner[Partition] with CosmosDBLoggingTrait { /** * @param config Partition configuration */ override def computePartitions(config: Config): Array[Partition] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig) val partitionKeyRanges = connection.getAllPartitions logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions") Array.tabulate(partitionKeyRanges.length){ @@ -47,7 +48,7 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingT } def computePartitions(config: Config, requiredColumns: Array[String] = Array()): Array[Partition] = { - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, hadoopConfig) connection.reinitializeClient() // CosmosDB source diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index 63f0db55..09cf0a6a 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -25,7 +25,7 @@ package com.microsoft.azure.cosmosdb.spark.rdd import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} import com.microsoft.azure.cosmosdb.spark.partitioner.{CosmosDBPartition, CosmosDBPartitioner} import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils -import com.microsoft.azure.cosmosdb.spark.CosmosDBSpark +import com.microsoft.azure.cosmosdb.spark.{CosmosDBConnectionCache, CosmosDBSpark} import com.microsoft.azure.documentdb._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.Filter @@ -40,7 +40,7 @@ class CosmosDBRDD( spark: SparkSession, config: Config, maxItems: Option[Long] = None, - partitioner: CosmosDBPartitioner = new CosmosDBPartitioner(), + partitionerRaw: CosmosDBPartitioner = null, requiredColumns: Array[String] = Array(), filters: Array[Filter] = Array()) extends RDD[Document](spark.sparkContext, deps = Nil) { @@ -49,6 +49,7 @@ class CosmosDBRDD( // It's a Map because Configuration is not serializable private val hadoopConfig: mutable.Map[String, String] = HdfsUtils.getConfigurationMap(sparkContext.hadoopConfiguration) + private val effectivePartitioner: CosmosDBPartitioner = Option.apply(partitionerRaw).getOrElse(new CosmosDBPartitioner(hadoopConfig)) private def cosmosDBSpark = { CosmosDBSpark(spark, config) @@ -57,7 +58,7 @@ class CosmosDBRDD( override def toJavaRDD(): JavaCosmosDBRDD = JavaCosmosDBRDD(this) override def getPartitions: Array[Partition] = { - partitioner.computePartitions(config) + effectivePartitioner.computePartitions(config) } /** diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 7906ab71..c78edc8a 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -53,12 +53,14 @@ object CosmosDBRDDIterator { // For verification purpose var lastFeedOptions: FeedOptions = _ var hdfsUtils: HdfsUtils = _ + var hadoopConfig: mutable.Map[String, String] = _ def initializeHdfsUtils(hadoopConfig: Map[String, String], changeFeedCheckpointLocation: String): Any = { if (hdfsUtils == null) { this.synchronized { if (hdfsUtils == null) { hdfsUtils = HdfsUtils(hadoopConfig, changeFeedCheckpointLocation) + this.hadoopConfig = collection.mutable.Map(hadoopConfig.toSeq: _*) } } } @@ -79,7 +81,7 @@ object CosmosDBRDDIterator { * @return the corresponding global continuation token */ def getCollectionTokens(config: Config, shouldGetCurrentToken: Boolean = false): String = { - val connection = CosmosDBConnection(config) + val connection = CosmosDBConnection(config, this.hadoopConfig) val collectionLink = connection.getCollectionLink val queryName = config .get[String](CosmosDBConfig.ChangeFeedQueryName).get @@ -155,9 +157,15 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], private val maxRetryCountOnServiceUnavailable: Int = 100 private val rnd = scala.util.Random + logInfo(s"CosmosDBRDDIterator initialized for PK range id ${partition.partitionKeyRangeId}") + lazy val reader: Iterator[Document] = { initialized = true - val connection: CosmosDBConnection = CosmosDBConnection(config) + val connection: CosmosDBConnection = CosmosDBConnection(config, this.hadoopConfig) + taskContext.addTaskCompletionListener((ctx: TaskContext) => { + logInfo(s"CosmosDBRDDIterator: Flushing LogWriter after completing task for partition key range id ${partition.partitionKeyRangeId}") + connection.flushLogWriter + }) val readingChangeFeed: Boolean = config .get[String](CosmosDBConfig.ReadChangeFeed) @@ -447,6 +455,7 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) if (!readingChangeFeed) { + logInfo(s"--> query document for pk range id ${partition.partitionKeyRangeId}") queryDocuments } else { readChangeFeed @@ -486,6 +495,8 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], } itemCount = itemCount + 1 reader.next() + + // TODO @fabianm AddLog } def closeIfNeeded(): Unit = { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala index 9f713929..ee53c1ec 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/HdfsUtils.scala @@ -22,13 +22,13 @@ */ package com.microsoft.azure.cosmosdb.spark.util -import java.io.{FileNotFoundException, PrintWriter, StringWriter} +import java.io.{BufferedOutputStream, FileNotFoundException, OutputStream, PrintWriter, StringWriter} import java.util import com.microsoft.azure.cosmosdb.spark.CosmosDBLoggingTrait import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator} +import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, LocatedFileStatus, Path, RemoteIterator} import scala.collection.mutable import java.net.URI @@ -54,6 +54,16 @@ case class HdfsUtils(configMap: Map[String, String], changeFeedCheckpointLocatio } } + def writeLogFile(base: String, filePath: String, content: String): Unit = { + val path = new Path(base + "/" + filePath) + retry(maxRetryCount) { + val os = fs.create(path) + val bos = new BufferedOutputStream(os) + bos.write(content.getBytes("UTF-8")) + bos.close() + } + } + def read(base: String, filePath: String, alternateQueryName: String, collectionRid: String): String = { val path = new Path(base + "/" + filePath) read(path, base, alternateQueryName, collectionRid) From ae9edf1094d2eb9730e34bf326837f6e70d1b961 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Sat, 1 May 2021 04:28:28 +0200 Subject: [PATCH 2/3] Updating the code to log bulkoperations from Spark for the CRI investigation --- pom.xml | 6 +- .../cosmosdb/spark/BulkExecutorSettings.scala | 1 + .../cosmosdb/spark/ClientConfiguration.scala | 17 +++--- .../azure/cosmosdb/spark/Constants.scala | 2 +- .../spark/CosmosDBConnectionCache.scala | 27 ++++++++- .../azure/cosmosdb/spark/CosmosDBSpark.scala | 48 ++++++++------- .../azure/cosmosdb/spark/HdfsLogWriter.scala | 59 +++++++++++++++++-- .../azure/cosmosdb/spark/LogginRDD.scala | 28 --------- .../cosmosdb/spark/LoggingIterator.scala | 4 ++ .../spark/config/CosmosDBConfig.scala | 1 + .../CosmosDBWriteStreamRetryPolicy.scala | 2 +- 11 files changed, 127 insertions(+), 68 deletions(-) delete mode 100644 src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala diff --git a/pom.xml b/pom.xml index d3ec76f6..45e549ca 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ limitations under the License. com.microsoft.azure azure-cosmosdb-spark_2.4.0_2.11 jar - 3.6.14-SNAPSHOT + 3.6.16-SNAPSHOT ${project.groupId}:${project.artifactId} Spark Connector for Microsoft Azure CosmosDB http://azure.microsoft.com/en-us/services/documentdb/ @@ -51,7 +51,7 @@ limitations under the License. com.microsoft.azure azure-documentdb - 2.7.2-SNAPSHOT + 2.7.5-SNAPSHOT org.scala-lang @@ -77,7 +77,7 @@ limitations under the License. com.microsoft.azure documentdb-bulkexecutor - 2.13.2-SNAPSHOT + 2.13.5-SNAPSHOT com.microsoft.azure diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala index 307a23e3..0d2d93fa 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/BulkExecutorSettings.scala @@ -33,5 +33,6 @@ package com.microsoft.azure.cosmosdb.spark private[spark] case class BulkExecutorSettings( maxMiniBatchUpdateCount: Int, partitionKeyOption: Option[String], + countLoggingPath: Option[String], bulkLoggingPath: Option[String], bulkLoggingCorrelationId: Option[String]) \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala index 8afeeb11..cf796c5f 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/ClientConfiguration.scala @@ -51,6 +51,7 @@ private[spark] case class ClientConfiguration( database: String, container: String, bulkConfig: BulkExecutorSettings, + countLoggingPath: Option[String], queryLoggingPath: Option[String], queryLoggingCorrelationId: Option[String], hadoopConfig: mutable.Map[String, String]) { @@ -64,13 +65,11 @@ private[spark] case class ClientConfiguration( } def getQueryLoggingPath(): Option[String] = { - queryLoggingPath match { - case Some(basePath) => queryLoggingCorrelationId match { - case Some(correlationId) => Some(basePath + queryLoggingCorrelationId.get + "/") - case None => Some(basePath) - } - case None => None - } + queryLoggingPath + } + + def getCountLoggingPath(): Option[String] = { + countLoggingPath } } @@ -80,6 +79,7 @@ object ClientConfiguration extends CosmosDBLoggingTrait { val collection : String = config.get(CosmosDBConfig.Collection).get val authConfig : AuthConfig = validateAndCreateAuthConfig(config, database, collection) val connectionPolicySettings : ConnectionPolicySettings = createConnectionPolicySettings(config) + val countLoggingPath = config.get(CosmosDBConfig.CountLoggingPath) val queryLoggingPath = config.get(CosmosDBConfig.QueryLoggingPath) val queryLoggingCorrelationId = config.get(CosmosDBConfig.QueryLoggingCorrelationId) val bulkExecutorSettings : BulkExecutorSettings = createBulkExecutorSettings(config) @@ -97,6 +97,7 @@ object ClientConfiguration extends CosmosDBLoggingTrait { database, collection, bulkExecutorSettings, + countLoggingPath, queryLoggingPath, queryLoggingCorrelationId, hadoopConfig @@ -130,11 +131,13 @@ object ClientConfiguration extends CosmosDBLoggingTrait { .getOrElse(CosmosDBConfig.MaxMiniBatchUpdateCount, CosmosDBConfig.DefaultMaxMiniBatchUpdateCount) val bulkLoggingPath = config.get(CosmosDBConfig.BulkLoggingPath) + val countLoggingPath = config.get(CosmosDBConfig.CountLoggingPath) val bulkLoggingCorrelationId = config.get(CosmosDBConfig.BulkLoggingCorrelationId) BulkExecutorSettings( maxMiniBatchUpdateCount, pkDef, + countLoggingPath, bulkLoggingPath, bulkLoggingCorrelationId) } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala index 51e4b076..9da90338 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala @@ -23,6 +23,6 @@ package com.microsoft.azure.cosmosdb.spark object Constants { - val currentVersion = "2.4.0_2.11-3.6.14-SNAPSHOT" + val currentVersion = "2.4.0_2.11-3.6.16-SNAPSHOT" val userAgentSuffix = s" SparkConnector/$currentVersion" } \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala index 9e4b17e2..bdf59695 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBConnectionCache.scala @@ -137,6 +137,15 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { builder = builder.withLogWriter(logWriter) } + if (config.bulkConfig.countLoggingPath.isDefined) { + val logWriter = new HdfsLogWriter( + config.bulkConfig.bulkLoggingCorrelationId.getOrElse(UUID.randomUUID().toString), + config.hadoopConfig.toMap, + config.bulkConfig.countLoggingPath.get) + + builder = builder.withCountLogWriter(logWriter) + } + // Instantiate DocumentBulkExecutor val bulkExecutor = builder.build() @@ -439,14 +448,14 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { val consistencyLevel = ConsistencyLevel.valueOf(config.consistencyLevel) lastConsistencyLevel = Some(consistencyLevel) - val client = new DocumentClient( + var client = new DocumentClient( config.host, config.authConfig.authKey, lastConnectionPolicy, consistencyLevel ) - config.getQueryLoggingPath() match { + client = config.getQueryLoggingPath() match { case Some(path) => { val logger = new HdfsLogWriter( config.queryLoggingCorrelationId.getOrElse(""), @@ -458,6 +467,20 @@ object CosmosDBConnectionCache extends CosmosDBLoggingTrait { case None => client } + client = config.getCountLoggingPath() match { + case Some(path) => { + val logger = new HdfsLogWriter( + config.queryLoggingCorrelationId.getOrElse(""), + config.hadoopConfig.toMap, + path) + + client.setCountLogWriter(logger); + } + case None => client + } + + client + } private def createConnectionPolicy(settings: ConnectionPolicySettings): ConnectionPolicy = { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala index ae78d707..b7d90b06 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala @@ -27,6 +27,7 @@ import java.io.StringWriter import java.lang.management.ManagementFactory import java.nio.charset.Charset import java.util.UUID +import java.util.concurrent.atomic.AtomicLong import com.microsoft.azure.cosmosdb.spark.config._ import com.microsoft.azure.cosmosdb.spark.rdd.{CosmosDBRDD, _} @@ -195,6 +196,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { val connection: CosmosDBConnection = CosmosDBConnection(writeConfig, hadoopConfig) val cosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config)) val iteratorLoggingPath = writeConfig.get[String](CosmosDBConfig.IteratorLoggingPath) + val countLoggingPath = writeConfig.get[String](CosmosDBConfig.CountLoggingPath) val iteratorLoggingCorrelationId = writeConfig.get[String](CosmosDBConfig.IteratorLoggingCorrelationId) val rootPropertyToSave = writeConfig.get[String](CosmosDBConfig.RootPropertyToSave) val applicationName: String = writeConfig.getOrElse[String](CosmosDBConfig.ApplicationName, "") @@ -204,32 +206,33 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { } else { s"${Constants.userAgentSuffix} ${ManagementFactory.getRuntimeMXBean.getName} $applicationName" } - var writer: Option[HdfsLogWriter] = None - var rddLogger: Option[IteratorLogger] = None - if (iteratorLoggingPath.isDefined) { - writer = Some(new HdfsLogWriter( - iteratorLoggingCorrelationId.getOrElse(""), - hadoopConfig.toMap, - iteratorLoggingPath.get)) - - rddLogger = Some(new IteratorLogger(writer.get, userAgentString, "n/a", "OriginalRDD")) - } + val mapRdd = rdd.coalesce(numPartitions).mapPartitions(partitionedIterator => { + val partitionedCosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config)) + val partitionedWriter = iteratorLoggingPath match { + case Some(path) => Some(new HdfsLogWriter( + iteratorLoggingCorrelationId.getOrElse(""), + hadoopConfig.toMap, + path)) + case None => None + } - val effectiveRdd = new LoggingRDD[D](rdd, rddLogger, pkDefinition, rootPropertyToSave, cosmosDBRowConverter) + val partitionedCountWriter = countLoggingPath match { + case Some(path) => Some(new HdfsLogWriter( + iteratorLoggingCorrelationId.getOrElse(""), + hadoopConfig.toMap, + path)) + case None => None + } - val mapRdd = effectiveRdd.coalesce(numPartitions).mapPartitions(partitionedIterator => { - val partitionedCosmosDBRowConverter = new CosmosDBRowConverter(SerializationConfig.fromConfig(connection.config)) - val partitionedWriter = Some(new HdfsLogWriter( - iteratorLoggingCorrelationId.getOrElse(""), - hadoopConfig.toMap, - iteratorLoggingPath.get)) var iterationLogger : Option[IteratorLogger] = None if (partitionedWriter.isDefined) { iterationLogger = Some(new IteratorLogger(partitionedWriter.get, userAgentString, "n/a", "partitionedIterator")) } + val counter = new AtomicLong(0) val effectiveIterator = LoggingIterator.createLoggingAndConvertingIterator( partitionedIterator, + counter, iterationLogger, pkDefinition, rootPropertyToSave, @@ -239,6 +242,12 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { val returnValue = savePartition(effectiveIterator, writeConfig, hadoopConfig, numPartitions, baseMaxMiniBatchImportSizeKB * 1024, writeThroughputBudgetPerCosmosPartition) + if (partitionedCountWriter.isDefined) { + val countLogger = new CosmosCountLogger(partitionedCountWriter.get) + countLogger.logCount("WithinMapPartitions", "I", counter.get(), "", "") + partitionedCountWriter.get.flush() + } + if (partitionedWriter.isDefined) { iterationLogger.get.flush() partitionedWriter.get.flush() @@ -247,11 +256,6 @@ object CosmosDBSpark extends CosmosDBLoggingTrait { returnValue }, true) mapRdd.collect() - - if (writer.isDefined) { - rddLogger.get.flush() - writer.get.flush() - } } private def bulkUpdate[D: ClassTag](iter: Iterator[D], diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala index 3bda1d8e..857d07ce 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/HdfsLogWriter.scala @@ -22,19 +22,61 @@ */ package com.microsoft.azure.cosmosdb.spark -import java.io.{BufferedOutputStream, Closeable} +import java.io.Closeable +import java.util.{Timer, TimerTask, UUID} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils import com.microsoft.azure.documentdb.CosmosLogWriter import org.apache.spark.SparkEnv +import org.joda.time.Instant +import scala.collection.concurrent.TrieMap import scala.util.Properties -private object HdfsLogWriter { +private object HdfsLogWriter extends CosmosDBLoggingTrait { + private val timerName = "hdfsLogWriter-cleanup-Timer" + private val timer: Timer = new Timer(timerName, true) + private val cleanupIntervalInMs = 60000 + private val writerCount = new AtomicInteger(0) val targetedMemoryBufferSizeInBytes = 50000000 val lineSeparator = Properties.lineSeparator + val logWriters = new TrieMap[String, HdfsLogWriter] + + def registerWriter(writer: HdfsLogWriter): Unit = { + logWriters.put(writer.id, writer) match { + case Some(existingWriter) => + throw new IllegalStateException(s"Already a writer '${writer.id}' registered.'") + case None => if (writerCount.incrementAndGet() == 1) { + startCleanupTimer() + } + } + } + + def deregisterWriter(writer: HdfsLogWriter): Unit = { + logWriters.remove(writer.loggingLocation) + } + + private def startCleanupTimer() : Unit = { + logInfo(s"$timerName: scheduling timer - delay: $cleanupIntervalInMs ms, period: $cleanupIntervalInMs ms") + timer.schedule( + new TimerTask { def run(): Unit = onCleanup() }, + cleanupIntervalInMs, + cleanupIntervalInMs) + } + + private def onCleanup() : Unit = { + logInfo(s"$timerName: onCleanup") + val snapshot = logWriters.readOnlySnapshot() + val threshold = Instant.now().getMillis - cleanupIntervalInMs + snapshot.foreach(writerHolder => { + val lastFlushed = writerHolder._2.lastFlushed.get() + if (lastFlushed > 0 && lastFlushed < threshold && writerHolder._2.hasData) { + writerHolder._2.flush() + } + }) + } } private case class HdfsLogWriter @@ -45,11 +87,14 @@ private case class HdfsLogWriter ) extends CosmosLogWriter with Closeable with CosmosDBLoggingTrait { private[this] val inMemoryLock = "" - private[this] val executorId: String = SparkEnv.get.executorId + val executorId: String = SparkEnv.get.executorId private[this] val fileId = new AtomicInteger(0) private[this] val sb: StringBuilder = new StringBuilder() private[this] lazy val hdfsUtils = new HdfsUtils(configMap, loggingLocation) + val lastFlushed = new AtomicLong(-1) + val id = s"${correlationId}_${executorId}_${loggingLocation}_${UUID.randomUUID()}" + HdfsLogWriter.registerWriter(this) logInfo("HdfsBulkLogWriter instantiated.") override def writeLine(line: String): Unit = { @@ -78,16 +123,22 @@ private case class HdfsLogWriter contentToFlush match { case Some(content) => { - val fileName = s"${correlationId}_${executorId}_${this.fileId.incrementAndGet()}.log" + val fileName = s"${correlationId}_${executorId}_${this.fileId.incrementAndGet()}_${UUID.randomUUID().toString()}.log" logInfo(s"WriteLogFile: ${fileName} - ${content.length} bytes") hdfsUtils.writeLogFile(this.loggingLocation, fileName, content) + lastFlushed.set(Instant.now().getMillis) } case None => } } + def hasData = { + this.sb.length > 0 + } + override def close(): Unit = { logInfo("Close") this.flush + HdfsLogWriter.deregisterWriter(this) } } diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala deleted file mode 100644 index bb598890..00000000 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LogginRDD.scala +++ /dev/null @@ -1,28 +0,0 @@ -package com.microsoft.azure.cosmosdb.spark - -import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter -import com.microsoft.azure.documentdb.{Document, PartitionKeyDefinition} -import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.rdd.RDD -import scala.reflect.ClassTag - -private[spark] class LoggingRDD[D: ClassTag] -( - innerRDD: RDD[D], - logger: Option[IteratorLogger], - partitionKey: PartitionKeyDefinition, - rootPropertyToSave: Option[String], - cosmosDBRowConverter: CosmosDBRowConverter -) extends RDD[Document](innerRDD) { - override def compute(split: Partition, context: TaskContext): Iterator[Document] = { - LoggingIterator.createLoggingAndConvertingIterator( - firstParent[D].iterator(split, context), - logger, - partitionKey, - rootPropertyToSave, - cosmosDBRowConverter - ) - } - - override protected def getPartitions: Array[Partition] = firstParent[D].partitions -} diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala index 4e653c0a..afa13b6b 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/LoggingIterator.scala @@ -22,6 +22,8 @@ */ package com.microsoft.azure.cosmosdb.spark +import java.util.concurrent.atomic.AtomicLong + import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter import com.microsoft.azure.documentdb.{Document, PartitionKeyDefinition} import org.apache.spark.sql.Row @@ -65,6 +67,7 @@ private[spark] object LoggingIterator { def createLoggingAndConvertingIterator[D: ClassTag] ( inner: Iterator[D], + counter: AtomicLong, logger: Option[IteratorLogger], partitionKeyDefinition: PartitionKeyDefinition, rootPropertyToSave: Option[String], @@ -87,6 +90,7 @@ private[spark] object LoggingIterator { if (logger.isDefined) { logger.get.onIteratorNext(document, partitionKeyDefinition) } + counter.incrementAndGet() document } catch { case t: Throwable => { diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala index 7935c043..19f07e1c 100755 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/config/CosmosDBConfig.scala @@ -101,6 +101,7 @@ object CosmosDBConfig { val QueryLoggingPath = "queryLoggingPath" val QueryLoggingCorrelationId = "queryLoggingCorrelationId" val IteratorLoggingPath = "iteratorLoggingPath" + val CountLoggingPath = "countLoggingPath" val IteratorLoggingCorrelationId = "iteratorLoggingCorrelationId" // Not a config, constant diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala index 766f34aa..aa95d7fc 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/streaming/CosmosDBWriteStreamRetryPolicy.scala @@ -111,7 +111,7 @@ class CosmosDBWriteStreamRetryPolicy(configMap: Map[String, String]) requestOptions, task, this.config.isTransient, - loggingAction = (msg: String) => logDebug(msg), + (msg: String) => logDebug(msg), (throwable: Throwable, document: Document) => this.notificationHandler.onPoisonMessage(throwable, document), this.rnd, maxRetries, From bfbc419a032c0f17de27974037230a8c2159d95d Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 11 May 2021 20:11:59 +0200 Subject: [PATCH 3/3] Latest versionw ith extesnive logging --- pom.xml | 4 ++-- .../scala/com/microsoft/azure/cosmosdb/spark/Constants.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 45e549ca..3402329f 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ limitations under the License. com.microsoft.azure azure-cosmosdb-spark_2.4.0_2.11 jar - 3.6.16-SNAPSHOT + 3.6.17-SNAPSHOT ${project.groupId}:${project.artifactId} Spark Connector for Microsoft Azure CosmosDB http://azure.microsoft.com/en-us/services/documentdb/ @@ -77,7 +77,7 @@ limitations under the License. com.microsoft.azure documentdb-bulkexecutor - 2.13.5-SNAPSHOT + 2.13.6-SNAPSHOT com.microsoft.azure diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala index 9da90338..c059221e 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala @@ -23,6 +23,6 @@ package com.microsoft.azure.cosmosdb.spark object Constants { - val currentVersion = "2.4.0_2.11-3.6.16-SNAPSHOT" + val currentVersion = "2.4.0_2.11-3.6.17-SNAPSHOT" val userAgentSuffix = s" SparkConnector/$currentVersion" } \ No newline at end of file