Skip to content

Commit b9da6dc

Browse files
author
Attila Tóth
committed
add: parameters to control number and the distribution of messages in a micro-batch
1 parent 29d01c4 commit b9da6dc

16 files changed

+1589
-13
lines changed

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,9 +325,77 @@ You can use `org.apache.spark.sql.pulsar.JsonUtils.topicOffsets(Map[String, Mess
325325
This may cause a false alarm. You can set it to `false` when it doesn't work as you expected. <br>
326326

327327
A batch query always fails if it fails to read any data from the provided offsets due to data loss.</td>
328+
</tr>
329+
<tr>
330+
<td>
331+
`maxEntriesPerTrigger`
332+
</td>
333+
<td>
334+
Number of entries to include in a single micro-batch during
335+
streaming.
336+
</td>
337+
<td>-1</td>
338+
<td>Streaming query</td>
339+
<td>This parameter controls how many Pulsar entries are read by
340+
the connector from the topic backlog at once. If the topic
341+
backlog is considerably high, users can use this parameter
342+
to limit the size of the micro-batch. If multiple topics are read,
343+
this parameter controls the complete number of entries fetched from
344+
all of them.
345+
346+
*Note:* Entries might contain multiple messages. The default value of `-1` means that the
347+
complete backlog is read at once.</td>
348+
</tr>
349+
350+
<tr>
351+
<td>
352+
`forwardStrategy`
353+
</td>
354+
<td>
355+
`simple`, `large-first` or `proportional`
356+
</td>
357+
<td>`simple`</td>
358+
<td>Streaming query</td>
359+
<td>If `maxEntriesPerTrigger` is set, this parameter controls
360+
which forwarding strategy is in use during the read of multiple
361+
topics.
362+
<li>
363+
`simple` just divides the allowed number of entries equally
364+
between all topics, regardless of their backlog size
365+
</li>
366+
<li>
367+
`large-first` will load the largest topic backlogs first,
368+
as the maximum number of allowed entries allows
369+
</li>
370+
<li>
371+
`proportional` will forward all topics proportional to the
372+
topic backlog/overall backlog ratio
373+
</li>
374+
</td>
375+
</tr>
328376

377+
<tr>
378+
<td>
379+
`ensureEntriesPerTopic`
380+
</td>
381+
<td>Number to forward each topic with during a micro-batch.</td>
382+
<td>0</td>
383+
<td>Streaming query</td>
384+
<td>If multiple topics are read, and the maximum number of
385+
entries is also specified, always forward all topics with the
386+
amount of entries specified here. Using this, users can ensure that topics
387+
with considerably smaller backlogs than others are also forwarded
388+
and read. Note that:
389+
<li>If this number is higher than the maximum allowed entries divided
390+
by the number of topics, then this value is taken into account, overriding
391+
the maximum number of entries per micro-batch.
392+
</li>
393+
<li>This parameter has an effect only for forwarding strategies
394+
`large-first` and `proportional`.</li>
395+
</td>
329396
</tr>
330397

398+
331399
</table>
332400

333401
#### Authentication

src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,20 @@ package org.apache.spark.sql.pulsar
1515

1616
import java.{util => ju}
1717
import java.io.Closeable
18-
import java.util.{Optional, UUID}
18+
import java.util.Optional
1919
import java.util.concurrent.TimeUnit
2020
import java.util.regex.Pattern
2121

2222
import org.apache.pulsar.client.admin.{PulsarAdmin, PulsarAdminException}
23-
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient, SubscriptionInitialPosition, SubscriptionType}
23+
import org.apache.pulsar.client.api.{Message, MessageId, PulsarClient}
2424
import org.apache.pulsar.client.impl.schema.BytesSchema
25+
import org.apache.pulsar.client.internal.DefaultImplementation
2526
import org.apache.pulsar.common.naming.TopicName
2627
import org.apache.pulsar.common.schema.SchemaInfo
2728

2829
import org.apache.spark.internal.Logging
29-
import org.apache.spark.sql.pulsar.PulsarOptions.{AUTH_PARAMS, AUTH_PLUGIN_CLASS_NAME, TLS_ALLOW_INSECURE_CONNECTION, TLS_HOSTNAME_VERIFICATION_ENABLE, TLS_TRUST_CERTS_FILE_PATH, TOPIC_OPTION_KEYS}
30+
import org.apache.spark.sql.pulsar.PulsarOptions._
31+
import org.apache.spark.sql.pulsar.topicinternalstats.forward._
3032
import org.apache.spark.sql.types.StructType
3133

3234
/**
@@ -205,6 +207,82 @@ private[pulsar] case class PulsarMetadataReader(
205207
}.toMap)
206208
}
207209

210+
211+
def forwardOffset(actualOffset: Map[String, MessageId],
212+
strategy: String,
213+
numberOfEntriesToForward: Long,
214+
ensureEntriesPerTopic: Long): SpecificPulsarOffset = {
215+
getTopicPartitions()
216+
217+
// Collect internal stats for all topics
218+
val topicStats = topicPartitions.map( topic => {
219+
val internalStats = admin.topics().getInternalStats(topic)
220+
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
221+
topic -> TopicState(internalStats,
222+
PulsarSourceUtils.getLedgerId(topicActualMessageId),
223+
PulsarSourceUtils.getEntryId(topicActualMessageId))
224+
} ).toMap
225+
226+
val forwarder = strategy match {
227+
case PulsarOptions.ProportionalForwardStrategy =>
228+
new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
229+
case PulsarOptions.LargeFirstForwardStrategy =>
230+
new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic)
231+
case _ =>
232+
new LinearForwardStrategy(numberOfEntriesToForward)
233+
}
234+
235+
SpecificPulsarOffset(topicPartitions.map { topic =>
236+
topic -> PulsarSourceUtils.seekableLatestMid {
237+
// Fetch actual offset for topic
238+
val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest)
239+
try {
240+
// Get the actual ledger
241+
val actualLedgerId = PulsarSourceUtils.getLedgerId(topicActualMessageId)
242+
// Get the actual entry ID
243+
val actualEntryId = PulsarSourceUtils.getEntryId(topicActualMessageId)
244+
// Get the partition index
245+
val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId)
246+
// Cache topic internal stats
247+
val internalStats = topicStats.get(topic).get.internalStat
248+
// Calculate the amount of messages we will pull in
249+
val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic)
250+
// Get a future message ID which corresponds
251+
// to the maximum number of messages
252+
val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId(
253+
internalStats,
254+
actualLedgerId,
255+
actualEntryId,
256+
numberOfEntriesPerTopic)
257+
// Build a message id
258+
val forwardedMessageId =
259+
DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex)
260+
// Log state
261+
val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil(
262+
internalStats, nextLedgerId, nextEntryId)
263+
val entryCount = internalStats.numberOfEntries
264+
val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f"
265+
val logMessage = s"Pulsar Connector forward on topic. " +
266+
s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" +
267+
s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " +
268+
s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]"
269+
log.debug(logMessage)
270+
// Return the message ID
271+
forwardedMessageId
272+
} catch {
273+
case e: PulsarAdminException if e.getStatusCode == 404 =>
274+
MessageId.earliest
275+
case e: Throwable =>
276+
throw new RuntimeException(
277+
s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " +
278+
s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " +
279+
s"starting from `$topicActualMessageId` using strategy $strategy)", e)
280+
}
281+
282+
}
283+
}.toMap)
284+
}
285+
208286
def fetchLatestOffsetForTopic(topic: String): MessageId = {
209287
PulsarSourceUtils.seekableLatestMid( try {
210288
admin.topics().getLastMessageId(topic)

src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ private[pulsar] object PulsarOptions {
3131
val TOPIC_MULTI = "topics"
3232
val TOPIC_PATTERN = "topicspattern"
3333

34+
val MaxEntriesPerTrigger = "maxentriespertrigger"
35+
val EnsureEntriesPerTopic = "ensureentriespertopic"
36+
val ForwardStrategy = "forwardstrategy"
37+
val ProportionalForwardStrategy = "proportional"
38+
val LargeFirstForwardStrategy = "large-first"
39+
3440
val PARTITION_SUFFIX = TopicName.PARTITIONED_TOPIC_SUFFIX
3541

3642
val TOPIC_OPTION_KEYS = Set(

src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,10 @@ private[pulsar] class PulsarProvider
110110
pollTimeoutMs(caseInsensitiveParams),
111111
failOnDataLoss(caseInsensitiveParams),
112112
subscriptionNamePrefix,
113-
jsonOptions
113+
jsonOptions,
114+
maxEntriesPerTrigger(caseInsensitiveParams),
115+
minEntriesPerTopic(caseInsensitiveParams),
116+
forwardStrategy(caseInsensitiveParams)
114117
)
115118
}
116119

@@ -365,6 +368,15 @@ private[pulsar] object PulsarProvider extends Logging {
365368
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
366369
.toInt
367370

371+
private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
372+
caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong
373+
374+
private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long =
375+
caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong
376+
377+
private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String =
378+
caseInsensitiveParams.getOrElse(ForwardStrategy, "simple")
379+
368380
private def validateGeneralOptions(
369381
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
370382
if (!caseInsensitiveParams.contains(SERVICE_URL_OPTION_KEY)) {

src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ private[pulsar] class PulsarSource(
3636
pollTimeoutMs: Int,
3737
failOnDataLoss: Boolean,
3838
subscriptionNamePrefix: String,
39-
jsonOptions: JSONOptionsInRead)
39+
jsonOptions: JSONOptionsInRead,
40+
maxEntriesPerTrigger: Long,
41+
ensureEntriesPerTopic: Long,
42+
forwardStrategy: String)
4043
extends Source
4144
with Logging {
4245

@@ -63,12 +66,21 @@ private[pulsar] class PulsarSource(
6366
override def schema(): StructType = SchemaUtils.pulsarSourceSchema(pulsarSchema)
6467

6568
override def getOffset: Option[Offset] = {
66-
// Make sure initialTopicOffsets is initialized
6769
initialTopicOffsets
68-
val latest = metadataReader.fetchLatestOffsets()
69-
currentTopicOffsets = Some(latest.topicOffsets)
70-
logDebug(s"GetOffset: ${latest.topicOffsets.toSeq.map(_.toString).sorted}")
71-
Some(latest.asInstanceOf[Offset])
70+
val nextOffsets = if (maxEntriesPerTrigger == -1) {
71+
metadataReader.fetchLatestOffsets()
72+
} else {
73+
currentTopicOffsets match {
74+
case Some(value) =>
75+
metadataReader.forwardOffset(value,
76+
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
77+
case _ =>
78+
metadataReader.forwardOffset(initialTopicOffsets.topicOffsets,
79+
forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic)
80+
}
81+
}
82+
logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}")
83+
Some(nextOffsets.asInstanceOf[Offset])
7284
}
7385

7486
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
@@ -78,9 +90,7 @@ private[pulsar] class PulsarSource(
7890
logInfo(s"getBatch called with start = $start, end = $end")
7991
val endTopicOffsets = SpecificPulsarOffset.getTopicOffsets(end)
8092

81-
if (currentTopicOffsets.isEmpty) {
82-
currentTopicOffsets = Some(endTopicOffsets)
83-
}
93+
currentTopicOffsets = Some(endTopicOffsets)
8494

8595
if (start.isDefined && start.get == end) {
8696
return sqlContext.internalCreateDataFrame(

src/main/scala/org/apache/spark/sql/pulsar/PulsarSources.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,36 @@ private[pulsar] object PulsarSourceUtils extends Logging {
120120
}
121121
}
122122

123+
def getLedgerId(mid: MessageId): Long = {
124+
mid match {
125+
case bmid: BatchMessageIdImpl =>
126+
bmid.getLedgerId
127+
case midi: MessageIdImpl => midi.getLedgerId
128+
case t: TopicMessageIdImpl => getLedgerId(t.getInnerMessageId)
129+
case up: UserProvidedMessageId => up.getLedgerId
130+
}
131+
}
132+
133+
def getEntryId(mid: MessageId): Long = {
134+
mid match {
135+
case bmid: BatchMessageIdImpl =>
136+
bmid.getEntryId
137+
case midi: MessageIdImpl => midi.getEntryId
138+
case t: TopicMessageIdImpl => getEntryId(t.getInnerMessageId)
139+
case up: UserProvidedMessageId => up.getEntryId
140+
}
141+
}
142+
143+
def getPartitionIndex(mid: MessageId): Int = {
144+
mid match {
145+
case bmid: BatchMessageIdImpl =>
146+
bmid.getPartitionIndex
147+
case midi: MessageIdImpl => midi.getPartitionIndex
148+
case t: TopicMessageIdImpl => getPartitionIndex(t.getInnerMessageId)
149+
case up: UserProvidedMessageId => up.getPartitionIndex
150+
}
151+
}
152+
123153
def seekableLatestMid(mid: MessageId): MessageId = {
124154
if (messageExists(mid)) mid else MessageId.earliest
125155
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.spark.sql.pulsar.topicinternalstats.forward
15+
16+
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats
17+
18+
trait ForwardStrategy {
19+
def forward(topics: Map[String, TopicState]): Map[String, Long]
20+
}
21+
22+
case class TopicState(internalStat: PersistentTopicInternalStats,
23+
actualLedgerId: Long,
24+
actualEntryId: Long)

0 commit comments

Comments
 (0)