[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20572 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170799163 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) +dir.mkdirs() +val logProps = new ju.Properties() +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) +logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) --- End diff -- Yeah, it's necessary, otherwise it gets treated as AnyRef. Changed to Float.valueOf FWIW --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170279504 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) +dir.mkdirs() +val logProps = new ju.Properties() +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) +logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) +val log = new Log( + dir, + LogConfig(logProps), + 0L, + mockTime.scheduler, + mockTime +) +messages.foreach { case (k, v) => +val msg = new ByteBufferMessageSet( --- End diff -- Unindent one level? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278317 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -162,17 +162,22 @@ private[kafka010] class KafkaTestUtils extends Logging { } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ - def createTopic(topic: String, partitions: Int): Unit = { -AdminUtils.createTopic(zkUtils, topic, partitions, 1) + def createTopic(topic: String, partitions: Int, config: Properties): Unit = { +AdminUtils.createTopic(zkUtils, topic, partitions, 1, config) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) } } + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String, partitions: Int): Unit = { +createTopic(topic, partitions, new Properties) + } + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String): Unit = { -createTopic(topic, 1) +createTopic(topic, 1, new Properties) --- End diff -- Nit: `new Properties()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278931 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private( } if (!buffer.hasNext()) { poll(timeout) } -assert(buffer.hasNext(), +require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss for $groupId $topic $partition $offset") seek(offset) poll(timeout) - assert(buffer.hasNext(), + require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() - assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") + require(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } --- End diff -- Nit: I'd expand this onto two lines --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278685 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010.mocks + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.PriorityQueue + +import kafka.utils.{Scheduler, Time} + +/** + * A mock scheduler that executes tasks synchronously using a mock time instance. + * Tasks are executed synchronously when the time is advanced. + * This class is meant to be used in conjunction with MockTime. + * + * Example usage + * + * val time = new MockTime + * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000) + * time.sleep(1001) // this should cause our scheduled task to fire + * + * + * Incrementing the time to the exact next execution time of a task will result in that task + * executing (it as if execution itself takes no time). + */ +private[kafka010] class MockScheduler(val time: Time) extends Scheduler { + + /* a priority queue of tasks ordered by next execution time */ + var tasks = new PriorityQueue[MockTask]() + + def isStarted: Boolean = true + + def startup(): Unit = {} + + def shutdown(): Unit = synchronized { +tasks.foreach(_.fun()) +tasks.clear() + } + + /** + * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs + * when this method is called and the execution happens synchronously in the calling thread. + * If you are using the scheduler associated with a MockTime instance this call + * will be triggered automatically. + */ + def tick() { +this synchronized { + val now = time.milliseconds + while(!tasks.isEmpty && tasks.head.nextExecution <= now) { +/* pop and execute the task with the lowest next execution time */ +val curr = tasks.dequeue +curr.fun() +/* if the task is periodic, reschedule it and re-enqueue */ +if(curr.periodic) { + curr.nextExecution += curr.period + this.tasks += curr +} + } +} + } + + def schedule( + name: String, + fun: () => Unit, + delay: Long = 0, + period: Long = -1, + unit: TimeUnit = TimeUnit.MILLISECONDS) { +this synchronized { --- End diff -- I think I'd still write such methods as: ``` def foo(): T = synchronized { ... } ``` This is how the rest of the code base does it (and other Scala code I've seen). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170279950 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) +dir.mkdirs() +val logProps = new ju.Properties() +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) +logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) --- End diff -- Do you have to 'cast' this to a Java Float object to get it to compile? `java.lang.Float.valueOf(0.1f)` works too I guess, but equally weird. OK if it's required. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278078 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) --- End diff -- Import `File`, other `java.*` classes? maybe I'm missing a name conflict. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170277915 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V]( override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = { val part = thePart.asInstanceOf[KafkaRDDPartition] -assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) +require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { - new KafkaRDDIterator(part, context) + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + +s"offsets ${part.fromOffset} -> ${part.untilOffset}") + if (compacted) { +new CompactedKafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } else { +new KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } } } +} - /** - * An iterator that fetches messages directly from Kafka for the offsets in partition. - * Uses a cached consumer where possible to take advantage of prefetching - */ - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { - -logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ +private class KafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + cacheInitialCapacity: Int, + cacheMaxCapacity: Int, + cacheLoadFactor: Float +) extends Iterator[ConsumerRecord[K, V]] { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + context.addTaskCompletionListener{ context => closeIfNeeded() } --- End diff -- This could be `...(_ => closeIfNeeded())` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170279150 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V]( }.toArray } - override def count(): Long = offsetRanges.map(_.count).sum + override def count(): Long = +if (compacted) { + super.count() +} else { + offsetRanges.map(_.count).sum +} override def countApprox( timeout: Long, confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { -val c = count -new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } + ): PartialResult[BoundedDouble] = +if (compacted) { + super.countApprox(timeout, confidence) +} else { + val c = count + new PartialResult(new BoundedDouble(c, 1.0, c, c), true) +} - override def isEmpty(): Boolean = count == 0L + override def isEmpty(): Boolean = +if (compacted) { + super.isEmpty() +} else { + count == 0L +} - override def take(num: Int): Array[ConsumerRecord[K, V]] = { -val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) + override def take(num: Int): Array[ConsumerRecord[K, V]] = +if (compacted) { + super.take(num) +} else { + val nonEmptyPartitions = this.partitions +.map(_.asInstanceOf[KafkaRDDPartition]) +.filter(_.count > 0) -if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[K, V]](0) -} + if (num < 1 || nonEmptyPartitions.isEmpty) { --- End diff -- I guess you could check `num < 1` before the map/filter, but it's trivial. You could write `return Array.empty[ConsumerRecord[K,V]]` too; again trivial. Since this is existing code I could see not touching it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169850605 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), --- End diff -- Agreed that require is better, will fix it in a sec Pretty sure assert is just a function in Predef.scala that throws an AssertionError, it's not like a java assert statement that can be en / disabled with java -ea / -da. Tested it out: https://gist.github.com/koeninger/6155cd94a19d1a6373ba0b40039e97e3 Disabling scala asserts can be done at compile time with -Xdisable-assertions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169663225 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), --- End diff -- `assert` turns into a JVM assert (I think?) and as such would be turned off if assertions are disabled, which is how it ought to run in production. If it's something that _could_ happen at all I think it should be `require` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169538019 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private( // TODO if the buffer was kept around as a random-access structure, // could possibly optimize re-calculating of an RDD in the same batch - protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() --- End diff -- Agreed, think it should be ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169537541 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), --- End diff -- That's a "shouldn't happen unless the topicpartition or broker is gone" kind of thing. Semantically I could see that being more like require than assert, but don't have a strong opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169536036 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -22,12 +22,17 @@ import java.{ util => ju } import scala.collection.JavaConverters._ import scala.util.Random +import kafka.common.TopicAndPartition --- End diff -- Right, LogCleaner hadn't yet been moved to the new apis, added a comment to that effect. Think we're ok here because it's just being used to mock up a compacted topic, not in the actual dstream api. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169489088 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), + s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout") +val record = buffer.next() +nextOffset = record.offset + 1 +record + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * Will throw NoSuchElementException if no previous element --- End diff -- Could be a `@throws` tag but no big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169489462 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private( // TODO if the buffer was kept around as a random-access structure, // could possibly optimize re-calculating of an RDD in the same batch - protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator + protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() --- End diff -- Was going to comment on the fact that it's `protected`, and what if a subclass depend on this, but, as the whole class is package-private I guess that's not an issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169491574 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010.mocks + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.PriorityQueue + +import kafka.utils.{Scheduler, Time} + +/** + * A mock scheduler that executes tasks synchronously using a mock time instance. + * Tasks are executed synchronously when the time is advanced. + * This class is meant to be used in conjunction with MockTime. + * + * Example usage + * + * val time = new MockTime + * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000) + * time.sleep(1001) // this should cause our scheduled task to fire + * + * + * Incrementing the time to the exact next execution time of a task will result in that task + * executing (it as if execution itself takes no time). + */ +private[kafka010] class MockScheduler(val time: Time) extends Scheduler { + + /* a priority queue of tasks ordered by next execution time */ + var tasks = new PriorityQueue[MockTask]() + + def isStarted: Boolean = true + + def startup() {} + + def shutdown() { +this synchronized { + tasks.foreach(_.fun()) + tasks.clear() +} + } + + /** + * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs + * when this method is called and the execution happens synchronously in the calling thread. + * If you are using the scheduler associated with a MockTime instance this call + * will be triggered automatically. + */ + def tick() { +this synchronized { + val now = time.milliseconds + while(!tasks.isEmpty && tasks.head.nextExecution <= now) { +/* pop and execute the task with the lowest next execution time */ +val curr = tasks.dequeue +curr.fun() +/* if the task is periodic, reschedule it and re-enqueue */ +if(curr.periodic) { + curr.nextExecution += curr.period + this.tasks += curr +} + } +} + } + + def schedule( + name: String, + fun: () => Unit, + delay: Long = 0, + period: Long = -1, + unit: TimeUnit = TimeUnit.MILLISECONDS) { +this synchronized { + tasks += MockTask(name, fun, time.milliseconds + delay, period = period) + tick() +} + } + +} + +case class MockTask( +val name: String, +val fun: () => Unit, +var nextExecution: Long, +val period: Long) extends Ordered[MockTask] { + def periodic: Boolean = period >= 0 + def compare(t: MockTask): Int = { +if (t.nextExecution == nextExecution) { --- End diff -- `java.lang.Long.compare(t.nextExecution, nextExecution)` does this too in a line, but whatever --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169491706 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010.mocks + +import java.util.concurrent._ + +import kafka.utils.Time + +/** + * A class used for unit testing things which depend on the Time interface. + * + * This class never manually advances the clock, it only does so when you call + * sleep(ms) + * + * It also comes with an associated scheduler instance for managing background tasks in + * a deterministic way. + */ +private[kafka010] class MockTime(@volatile private var currentMs: Long) extends Time { + + val scheduler = new MockScheduler(this) + + def this() = this(System.currentTimeMillis) + + def milliseconds: Long = currentMs + + def nanoseconds: Long = +TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS) + + def sleep(ms: Long) { +this.currentMs += ms +scheduler.tick() + } + + override def toString(): String = "MockTime(%d)".format(milliseconds) --- End diff -- Use string interpolation to be consistent? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169490949 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V]( s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { - new KafkaRDDIterator(part, context) + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + +s"offsets ${part.fromOffset} -> ${part.untilOffset}") + if (compacted) { +new CompactedKafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } else { +new KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } } } +} - /** - * An iterator that fetches messages directly from Kafka for the offsets in partition. - * Uses a cached consumer where possible to take advantage of prefetching - */ - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { - -logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - -context.addTaskCompletionListener{ context => closeIfNeeded() } - -val consumer = if (useConsumerCache) { - CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) - if (context.attemptNumber >= 1) { -// just in case the prior attempt failures were cache related -CachedKafkaConsumer.remove(groupId, part.topic, part.partition) - } - CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) -} else { - CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ +private class KafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + cacheInitialCapacity: Int, + cacheMaxCapacity: Int, + cacheLoadFactor: Float +) extends Iterator[ConsumerRecord[K, V]] { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + context.addTaskCompletionListener{ context => closeIfNeeded() } + + val consumer = if (useConsumerCache) { +CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) +if (context.attemptNumber >= 1) { + // just in case the prior attempt failures were cache related + CachedKafkaConsumer.remove(groupId, part.topic, part.partition) } +CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) + } else { +CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) + } -var requestOffset = part.fromOffset + var requestOffset = part.fromOffset -def closeIfNeeded(): Unit = { - if (!useConsumerCache && consumer != null) { -consumer.close - } + def closeIfNeeded(): Unit = { +if (!useConsumerCache && consumer != null) { + consumer.close } + } + + override def hasNext(): Boolean = requestOffset < part.untilOffset -override def hasNext(): Boolean = requestOffset < part.untilOffset + override def next(): ConsumerRecord[K, V] = { +assert(hasNext(), "Can't call getNext() once untilOffset has been reached") +val r = consumer.get(requestOffset, pollTimeout) +requestOffset += 1 +r + } +} -override def next(): ConsumerRecord[K, V] = { - assert(hasNext(), "Can't call getNext() once untilOffset has been reached") - val r = consumer.get(requestOffset, pollTimeout) - requestOffset += 1 - r +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169490790 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -87,47 +89,63 @@ private[spark] class KafkaRDD[K, V]( }.toArray } - override def count(): Long = offsetRanges.map(_.count).sum + override def count(): Long = +if (compacted) { + super.count() +} else { + offsetRanges.map(_.count).sum +} override def countApprox( timeout: Long, confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { -val c = count -new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } + ): PartialResult[BoundedDouble] = +if (compacted) { + super.countApprox(timeout, confidence) +} else { + val c = count + new PartialResult(new BoundedDouble(c, 1.0, c, c), true) +} - override def isEmpty(): Boolean = count == 0L + override def isEmpty(): Boolean = +if (compacted) { + super.isEmpty() +} else { + count == 0L +} - override def take(num: Int): Array[ConsumerRecord[K, V]] = { -val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) + override def take(num: Int): Array[ConsumerRecord[K, V]] = +if (compacted) { + super.take(num) +} else { + val nonEmptyPartitions = this.partitions +.map(_.asInstanceOf[KafkaRDDPartition]) +.filter(_.count > 0) -if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[K, V]](0) -} + if (num < 1 || nonEmptyPartitions.isEmpty) { +return new Array[ConsumerRecord[K, V]](0) + } -// Determine in advance how many messages need to be taken from each partition -val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { -val taken = Math.min(remain, part.count) -result + (part.index -> taken.toInt) - } else { -result + // Determine in advance how many messages need to be taken from each partition + val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => +val remain = num - result.values.sum +if (remain > 0) { + val taken = Math.min(remain, part.count) + result + (part.index -> taken.toInt) +} else { + result +} } -} -val buf = new ArrayBuffer[ConsumerRecord[K, V]] -val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => - it.take(parts(tc.partitionId)).toArray, parts.keys.toArray -) -res.foreach(buf ++= _) -buf.toArray - } + val buf = new ArrayBuffer[ConsumerRecord[K, V]] + val res = context.runJob( +this, +(tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) => +it.take(parts(tc.partitionId)).toArray, parts.keys.toArray + ) + res.foreach(buf ++= _) + buf.toArray --- End diff -- I am not sure why this code doesn't just `.flatten` the result of `.runJob` to get an array of all of the results. Feel free to change it, or not. Maybe I'm missing something --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169491061 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -22,12 +22,17 @@ import java.{ util => ju } import scala.collection.JavaConverters._ import scala.util.Random +import kafka.common.TopicAndPartition --- End diff -- These are the older Kafka APIs right? this may all be correct, just making sure these are the classes that are needed in a Kafka 0.10 test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169490896 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V]( s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { - new KafkaRDDIterator(part, context) + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + +s"offsets ${part.fromOffset} -> ${part.untilOffset}") + if (compacted) { +new CompactedKafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } else { +new KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } } } +} - /** - * An iterator that fetches messages directly from Kafka for the offsets in partition. - * Uses a cached consumer where possible to take advantage of prefetching - */ - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { - -logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - -context.addTaskCompletionListener{ context => closeIfNeeded() } - -val consumer = if (useConsumerCache) { - CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) - if (context.attemptNumber >= 1) { -// just in case the prior attempt failures were cache related -CachedKafkaConsumer.remove(groupId, part.topic, part.partition) - } - CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) -} else { - CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ +private class KafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + cacheInitialCapacity: Int, + cacheMaxCapacity: Int, + cacheLoadFactor: Float +) extends Iterator[ConsumerRecord[K, V]] { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + context.addTaskCompletionListener{ context => closeIfNeeded() } + + val consumer = if (useConsumerCache) { +CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) +if (context.attemptNumber >= 1) { + // just in case the prior attempt failures were cache related + CachedKafkaConsumer.remove(groupId, part.topic, part.partition) } +CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) + } else { +CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) + } -var requestOffset = part.fromOffset + var requestOffset = part.fromOffset -def closeIfNeeded(): Unit = { - if (!useConsumerCache && consumer != null) { -consumer.close - } + def closeIfNeeded(): Unit = { +if (!useConsumerCache && consumer != null) { + consumer.close } + } + + override def hasNext(): Boolean = requestOffset < part.untilOffset -override def hasNext(): Boolean = requestOffset < part.untilOffset + override def next(): ConsumerRecord[K, V] = { +assert(hasNext(), "Can't call getNext() once untilOffset has been reached") --- End diff -- This seems like something that shouldn't be an assert because a caller could call `next()` out of order. Should be an exception, to be sure. `NoSuchElementException`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169491286 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010.mocks + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.PriorityQueue + +import kafka.utils.{Scheduler, Time} + +/** + * A mock scheduler that executes tasks synchronously using a mock time instance. + * Tasks are executed synchronously when the time is advanced. + * This class is meant to be used in conjunction with MockTime. + * + * Example usage + * + * val time = new MockTime + * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000) + * time.sleep(1001) // this should cause our scheduled task to fire + * + * + * Incrementing the time to the exact next execution time of a task will result in that task + * executing (it as if execution itself takes no time). + */ +private[kafka010] class MockScheduler(val time: Time) extends Scheduler { + + /* a priority queue of tasks ordered by next execution time */ + var tasks = new PriorityQueue[MockTask]() + + def isStarted: Boolean = true + + def startup() {} + + def shutdown() { --- End diff -- Just a style question or nit, but, what about: `def shutdown(): Unit = synchronized { ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169490350 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V]( s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { - new KafkaRDDIterator(part, context) + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + +s"offsets ${part.fromOffset} -> ${part.untilOffset}") + if (compacted) { +new CompactedKafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } else { +new KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } } } +} - /** - * An iterator that fetches messages directly from Kafka for the offsets in partition. - * Uses a cached consumer where possible to take advantage of prefetching - */ - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { - -logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - -context.addTaskCompletionListener{ context => closeIfNeeded() } - -val consumer = if (useConsumerCache) { - CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) - if (context.attemptNumber >= 1) { -// just in case the prior attempt failures were cache related -CachedKafkaConsumer.remove(groupId, part.topic, part.partition) - } - CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) -} else { - CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ +private class KafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + cacheInitialCapacity: Int, + cacheMaxCapacity: Int, + cacheLoadFactor: Float +) extends Iterator[ConsumerRecord[K, V]] { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + context.addTaskCompletionListener{ context => closeIfNeeded() } + + val consumer = if (useConsumerCache) { +CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) +if (context.attemptNumber >= 1) { + // just in case the prior attempt failures were cache related + CachedKafkaConsumer.remove(groupId, part.topic, part.partition) } +CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) + } else { +CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) + } -var requestOffset = part.fromOffset + var requestOffset = part.fromOffset -def closeIfNeeded(): Unit = { - if (!useConsumerCache && consumer != null) { -consumer.close - } + def closeIfNeeded(): Unit = { +if (!useConsumerCache && consumer != null) { + consumer.close --- End diff -- Just nits here, but I'd write `close()` as it clearly has side effects --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r169489226 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private( s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } +assert(buffer.hasNext(), --- End diff -- Should this really be an assert, like, should never happen regardless of input or external state? or just a condition that should generate an exception if false? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/20572 [SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets ## What changes were proposed in this pull request? Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to allow streaming jobs to proceed on compacted topics (or other situations involving gaps between offsets in the log). ## How was this patch tested? Added new unit test @justinrmiller has been testing this branch in production for a few weeks You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-17147 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20572.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20572 commit 3082de7e43e8c381dc2227005d1e0fc5bd2c3d29 Author: cody koeningerDate: 2016-10-08T21:21:48Z [SPARK-17147][STREAMING][KAFKA] failing test for compacted topics commit e8ea89ea10527c6723df4af2685004ea67d872cd Author: cody koeninger Date: 2016-10-09T04:59:39Z [SPARK-17147][STREAMING][KAFKA] test passing for compacted topics commit 182943e36f596d0cb5841a9c63471bea1dd9047b Author: cody koeninger Date: 2018-02-11T04:09:38Z spark.streaming.kafka.allowNonConsecutiveOffsets commit 89f4bc5f4de78cdcc22b5c9b26a27ee9263048c8 Author: cody koeninger Date: 2018-02-11T04:13:49Z [SPARK-17147][STREAMING][KAFKA] remove stray param doc commit 12e65bedddbcd2407598e69fa3c6fcbcdfc67e5d Author: cody koeninger Date: 2018-02-11T04:28:22Z [SPARK-17147][STREAMING][KAFKA] prepare for merge of master commit 2ed51f1f73ee75ffd08355265a72e68e83ef592d Author: cody koeninger Date: 2018-02-11T05:19:31Z Merge branch 'master' into SPARK-17147 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org