[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20572


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170799163
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
+dir.mkdirs()
+val logProps = new ju.Properties()
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: 
java.lang.Float)
--- End diff --

Yeah, it's necessary, otherwise it gets treated as AnyRef.  Changed to 
Float.valueOf FWIW


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170279504
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
+dir.mkdirs()
+val logProps = new ju.Properties()
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: 
java.lang.Float)
+val log = new Log(
+  dir,
+  LogConfig(logProps),
+  0L,
+  mockTime.scheduler,
+  mockTime
+)
+messages.foreach { case (k, v) =>
+val msg = new ByteBufferMessageSet(
--- End diff --

Unindent one level?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278317
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -162,17 +162,22 @@ private[kafka010] class KafkaTestUtils extends 
Logging {
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
-  def createTopic(topic: String, partitions: Int): Unit = {
-AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+  def createTopic(topic: String, partitions: Int, config: Properties): 
Unit = {
+AdminUtils.createTopic(zkUtils, topic, partitions, 1, config)
 // wait until metadata is propagated
 (0 until partitions).foreach { p =>
   waitUntilMetadataIsPropagated(topic, p)
 }
   }
 
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+createTopic(topic, partitions, new Properties)
+  }
+
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
   def createTopic(topic: String): Unit = {
-createTopic(topic, 1)
+createTopic(topic, 1, new Properties)
--- End diff --

Nit: `new Properties()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278931
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private(
 }
 
 if (!buffer.hasNext()) { poll(timeout) }
-assert(buffer.hasNext(),
+require(buffer.hasNext(),
   s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
 var record = buffer.next()
 
 if (record.offset != offset) {
   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
   seek(offset)
   poll(timeout)
-  assert(buffer.hasNext(),
+  require(buffer.hasNext(),
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
-  assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+  require(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
--- End diff --

Nit: I'd expand this onto two lines


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278685
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.PriorityQueue
+
+import kafka.utils.{Scheduler, Time}
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time 
instance.
+ * Tasks are executed synchronously when the time is advanced.
+ * This class is meant to be used in conjunction with MockTime.
+ *
+ * Example usage
+ * 
+ *   val time = new MockTime
+ *   time.scheduler.schedule("a task", println("hello world: " + 
time.milliseconds), delay = 1000)
+ *   time.sleep(1001) // this should cause our scheduled task to fire
+ * 
+ *
+ * Incrementing the time to the exact next execution time of a task will 
result in that task
+ * executing (it as if execution itself takes no time).
+ */
+private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
+
+  /* a priority queue of tasks ordered by next execution time */
+  var tasks = new PriorityQueue[MockTask]()
+
+  def isStarted: Boolean = true
+
+  def startup(): Unit = {}
+
+  def shutdown(): Unit = synchronized {
+tasks.foreach(_.fun())
+tasks.clear()
+  }
+
+  /**
+   * Check for any tasks that need to execute. Since this is a mock 
scheduler this check only occurs
+   * when this method is called and the execution happens synchronously in 
the calling thread.
+   * If you are using the scheduler associated with a MockTime instance 
this call
+   * will be triggered automatically.
+   */
+  def tick() {
+this synchronized {
+  val now = time.milliseconds
+  while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+/* pop and execute the task with the lowest next execution time */
+val curr = tasks.dequeue
+curr.fun()
+/* if the task is periodic, reschedule it and re-enqueue */
+if(curr.periodic) {
+  curr.nextExecution += curr.period
+  this.tasks += curr
+}
+  }
+}
+  }
+
+  def schedule(
+  name: String,
+  fun: () => Unit,
+  delay: Long = 0,
+  period: Long = -1,
+  unit: TimeUnit = TimeUnit.MILLISECONDS) {
+this synchronized {
--- End diff --

I think I'd still write such methods as:

```
def foo(): T = synchronized {
   ...
}
```

This is how the rest of the code base does it (and other Scala code I've 
seen).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170279950
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
+dir.mkdirs()
+val logProps = new ju.Properties()
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: 
java.lang.Float)
--- End diff --

Do you have to 'cast' this to a Java Float object to get it to compile?
`java.lang.Float.valueOf(0.1f)` works too I guess, but equally weird. OK if 
it's required.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278078
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
--- End diff --

Import `File`, other `java.*` classes? maybe I'm missing a name conflict.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170277915
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V](
 
   override def compute(thePart: Partition, context: TaskContext): 
Iterator[ConsumerRecord[K, V]] = {
 val part = thePart.asInstanceOf[KafkaRDDPartition]
-assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
 if (part.fromOffset == part.untilOffset) {
   logInfo(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
 s"skipping ${part.topic} ${part.partition}")
   Iterator.empty
 } else {
-  new KafkaRDDIterator(part, context)
+  logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
+s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+  if (compacted) {
+new CompactedKafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  } else {
+new KafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  }
 }
   }
+}
 
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
-   * Uses a cached consumer where possible to take advantage of prefetching
-   */
-  private class KafkaRDDIterator(
-  part: KafkaRDDPartition,
-  context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
-logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
-  s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+  part: KafkaRDDPartition,
+  context: TaskContext,
+  kafkaParams: ju.Map[String, Object],
+  useConsumerCache: Boolean,
+  pollTimeout: Long,
+  cacheInitialCapacity: Int,
+  cacheMaxCapacity: Int,
+  cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  context.addTaskCompletionListener{ context => closeIfNeeded() }
--- End diff --

This could be `...(_ => closeIfNeeded())`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170279150
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V](
 }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.count).sum
+  override def count(): Long =
+if (compacted) {
+  super.count()
+} else {
+  offsetRanges.map(_.count).sum
+}
 
   override def countApprox(
   timeout: Long,
   confidence: Double = 0.95
-  ): PartialResult[BoundedDouble] = {
-val c = count
-new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
+  ): PartialResult[BoundedDouble] =
+if (compacted) {
+  super.countApprox(timeout, confidence)
+} else {
+  val c = count
+  new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+}
 
-  override def isEmpty(): Boolean = count == 0L
+  override def isEmpty(): Boolean =
+if (compacted) {
+  super.isEmpty()
+} else {
+  count == 0L
+}
 
-  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
-val nonEmptyPartitions = this.partitions
-  .map(_.asInstanceOf[KafkaRDDPartition])
-  .filter(_.count > 0)
+  override def take(num: Int): Array[ConsumerRecord[K, V]] =
+if (compacted) {
+  super.take(num)
+} else {
+  val nonEmptyPartitions = this.partitions
+.map(_.asInstanceOf[KafkaRDDPartition])
+.filter(_.count > 0)
 
-if (num < 1 || nonEmptyPartitions.isEmpty) {
-  return new Array[ConsumerRecord[K, V]](0)
-}
+  if (num < 1 || nonEmptyPartitions.isEmpty) {
--- End diff --

I guess you could check `num < 1` before the map/filter, but it's trivial.
You could write `return Array.empty[ConsumerRecord[K,V]]` too; again 
trivial.
Since this is existing code I could see not touching it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-21 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169850605
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
--- End diff --

Agreed that require is better, will fix it in a sec

Pretty sure assert is just a function in Predef.scala that throws an 
AssertionError, it's not like a java assert statement that can be en / disabled 
with java -ea / -da.  Tested it out:

https://gist.github.com/koeninger/6155cd94a19d1a6373ba0b40039e97e3

Disabling scala asserts can be done at compile time with 
-Xdisable-assertions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-21 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169663225
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
--- End diff --

 `assert` turns into a JVM assert (I think?) and as such would be turned 
off if assertions are disabled, which is how it ought to run in production. If 
it's something that _could_ happen at all I think it should be `require`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169538019
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
 
   // TODO if the buffer was kept around as a random-access structure,
   // could possibly optimize re-calculating of an RDD in the same batch
-  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
--- End diff --

Agreed, think it should be ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169537541
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
--- End diff --

That's a "shouldn't happen unless the topicpartition or broker is gone" 
kind of thing.  Semantically I could see that being more like require than 
assert, but don't have a strong opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169536036
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -22,12 +22,17 @@ import java.{ util => ju }
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import kafka.common.TopicAndPartition
--- End diff --

Right, LogCleaner hadn't yet been moved to the new apis, added a comment to 
that effect.
Think we're ok here because it's just being used to mock up a compacted 
topic, not in the actual dstream api.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169489088
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
+  s"Failed to get records for compacted $groupId $topic $partition 
after polling for $timeout")
+val record = buffer.next()
+nextOffset = record.offset + 1
+record
+  }
+
+  /**
+   * Rewind to previous record in the batch from a compacted topic.
+   * Will throw NoSuchElementException if no previous element
--- End diff --

Could be a `@throws` tag but no big deal.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169489462
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -53,7 +51,7 @@ class CachedKafkaConsumer[K, V] private(
 
   // TODO if the buffer was kept around as a random-access structure,
   // could possibly optimize re-calculating of an RDD in the same batch
-  protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, 
V]]().iterator
+  protected var buffer = 
ju.Collections.emptyListIterator[ConsumerRecord[K, V]]()
--- End diff --

Was going to comment on the fact that it's `protected`, and what if a 
subclass depend on this, but, as the whole class is package-private I guess 
that's not an issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169491574
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.PriorityQueue
+
+import kafka.utils.{Scheduler, Time}
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time 
instance.
+ * Tasks are executed synchronously when the time is advanced.
+ * This class is meant to be used in conjunction with MockTime.
+ *
+ * Example usage
+ * 
+ *   val time = new MockTime
+ *   time.scheduler.schedule("a task", println("hello world: " + 
time.milliseconds), delay = 1000)
+ *   time.sleep(1001) // this should cause our scheduled task to fire
+ * 
+ *
+ * Incrementing the time to the exact next execution time of a task will 
result in that task
+ * executing (it as if execution itself takes no time).
+ */
+private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
+
+  /* a priority queue of tasks ordered by next execution time */
+  var tasks = new PriorityQueue[MockTask]()
+
+  def isStarted: Boolean = true
+
+  def startup() {}
+
+  def shutdown() {
+this synchronized {
+  tasks.foreach(_.fun())
+  tasks.clear()
+}
+  }
+
+  /**
+   * Check for any tasks that need to execute. Since this is a mock 
scheduler this check only occurs
+   * when this method is called and the execution happens synchronously in 
the calling thread.
+   * If you are using the scheduler associated with a MockTime instance 
this call
+   * will be triggered automatically.
+   */
+  def tick() {
+this synchronized {
+  val now = time.milliseconds
+  while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+/* pop and execute the task with the lowest next execution time */
+val curr = tasks.dequeue
+curr.fun()
+/* if the task is periodic, reschedule it and re-enqueue */
+if(curr.periodic) {
+  curr.nextExecution += curr.period
+  this.tasks += curr
+}
+  }
+}
+  }
+
+  def schedule(
+  name: String,
+  fun: () => Unit,
+  delay: Long = 0,
+  period: Long = -1,
+  unit: TimeUnit = TimeUnit.MILLISECONDS) {
+this synchronized {
+  tasks += MockTask(name, fun, time.milliseconds + delay, period = 
period)
+  tick()
+}
+  }
+
+}
+
+case class MockTask(
+val name: String,
+val fun: () => Unit,
+var nextExecution: Long,
+val period: Long) extends Ordered[MockTask] {
+  def periodic: Boolean = period >= 0
+  def compare(t: MockTask): Int = {
+if (t.nextExecution == nextExecution) {
--- End diff --

`java.lang.Long.compare(t.nextExecution, nextExecution)` does this too in a 
line, but whatever


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169491706
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockTime.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent._
+
+import kafka.utils.Time
+
+/**
+ * A class used for unit testing things which depend on the Time interface.
+ *
+ * This class never manually advances the clock, it only does so when you 
call
+ *   sleep(ms)
+ *
+ * It also comes with an associated scheduler instance for managing 
background tasks in
+ * a deterministic way.
+ */
+private[kafka010] class MockTime(@volatile private var currentMs: Long) 
extends Time {
+
+  val scheduler = new MockScheduler(this)
+
+  def this() = this(System.currentTimeMillis)
+
+  def milliseconds: Long = currentMs
+
+  def nanoseconds: Long =
+TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+  def sleep(ms: Long) {
+this.currentMs += ms
+scheduler.tick()
+  }
+
+  override def toString(): String = "MockTime(%d)".format(milliseconds)
--- End diff --

Use string interpolation to be consistent?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169490949
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
 s"skipping ${part.topic} ${part.partition}")
   Iterator.empty
 } else {
-  new KafkaRDDIterator(part, context)
+  logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
+s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+  if (compacted) {
+new CompactedKafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  } else {
+new KafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  }
 }
   }
+}
 
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
-   * Uses a cached consumer where possible to take advantage of prefetching
-   */
-  private class KafkaRDDIterator(
-  part: KafkaRDDPartition,
-  context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
-logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
-  s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-
-context.addTaskCompletionListener{ context => closeIfNeeded() }
-
-val consumer = if (useConsumerCache) {
-  CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
-  if (context.attemptNumber >= 1) {
-// just in case the prior attempt failures were cache related
-CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
-  }
-  CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
-} else {
-  CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+  part: KafkaRDDPartition,
+  context: TaskContext,
+  kafkaParams: ju.Map[String, Object],
+  useConsumerCache: Boolean,
+  pollTimeout: Long,
+  cacheInitialCapacity: Int,
+  cacheMaxCapacity: Int,
+  cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+  val consumer = if (useConsumerCache) {
+CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
+if (context.attemptNumber >= 1) {
+  // just in case the prior attempt failures were cache related
+  CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
 }
+CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
+  } else {
+CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+  }
 
-var requestOffset = part.fromOffset
+  var requestOffset = part.fromOffset
 
-def closeIfNeeded(): Unit = {
-  if (!useConsumerCache && consumer != null) {
-consumer.close
-  }
+  def closeIfNeeded(): Unit = {
+if (!useConsumerCache && consumer != null) {
+  consumer.close
 }
+  }
+
+  override def hasNext(): Boolean = requestOffset < part.untilOffset
 
-override def hasNext(): Boolean = requestOffset < part.untilOffset
+  override def next(): ConsumerRecord[K, V] = {
+assert(hasNext(), "Can't call getNext() once untilOffset has been 
reached")
+val r = consumer.get(requestOffset, pollTimeout)
+requestOffset += 1
+r
+  }
+}
 
-override def next(): ConsumerRecord[K, V] = {
-  assert(hasNext(), "Can't call getNext() once untilOffset has been 
reached")
-  val r = consumer.get(requestOffset, pollTimeout)
-  requestOffset += 1
-  r
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
+ * Uses a cached consumer where possible to 

[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169490790
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -87,47 +89,63 @@ private[spark] class KafkaRDD[K, V](
 }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.count).sum
+  override def count(): Long =
+if (compacted) {
+  super.count()
+} else {
+  offsetRanges.map(_.count).sum
+}
 
   override def countApprox(
   timeout: Long,
   confidence: Double = 0.95
-  ): PartialResult[BoundedDouble] = {
-val c = count
-new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
+  ): PartialResult[BoundedDouble] =
+if (compacted) {
+  super.countApprox(timeout, confidence)
+} else {
+  val c = count
+  new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+}
 
-  override def isEmpty(): Boolean = count == 0L
+  override def isEmpty(): Boolean =
+if (compacted) {
+  super.isEmpty()
+} else {
+  count == 0L
+}
 
-  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
-val nonEmptyPartitions = this.partitions
-  .map(_.asInstanceOf[KafkaRDDPartition])
-  .filter(_.count > 0)
+  override def take(num: Int): Array[ConsumerRecord[K, V]] =
+if (compacted) {
+  super.take(num)
+} else {
+  val nonEmptyPartitions = this.partitions
+.map(_.asInstanceOf[KafkaRDDPartition])
+.filter(_.count > 0)
 
-if (num < 1 || nonEmptyPartitions.isEmpty) {
-  return new Array[ConsumerRecord[K, V]](0)
-}
+  if (num < 1 || nonEmptyPartitions.isEmpty) {
+return new Array[ConsumerRecord[K, V]](0)
+  }
 
-// Determine in advance how many messages need to be taken from each 
partition
-val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, 
part) =>
-  val remain = num - result.values.sum
-  if (remain > 0) {
-val taken = Math.min(remain, part.count)
-result + (part.index -> taken.toInt)
-  } else {
-result
+  // Determine in advance how many messages need to be taken from each 
partition
+  val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, 
part) =>
+val remain = num - result.values.sum
+if (remain > 0) {
+  val taken = Math.min(remain, part.count)
+  result + (part.index -> taken.toInt)
+} else {
+  result
+}
   }
-}
 
-val buf = new ArrayBuffer[ConsumerRecord[K, V]]
-val res = context.runJob(
-  this,
-  (tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
-  it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
-)
-res.foreach(buf ++= _)
-buf.toArray
-  }
+  val buf = new ArrayBuffer[ConsumerRecord[K, V]]
+  val res = context.runJob(
+this,
+(tc: TaskContext, it: Iterator[ConsumerRecord[K, V]]) =>
+it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
+  )
+  res.foreach(buf ++= _)
+  buf.toArray
--- End diff --

I am not sure why this code doesn't just `.flatten` the result of `.runJob` 
to get an array of all of the results. Feel free to change it, or not. Maybe 
I'm missing something


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169491061
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -22,12 +22,17 @@ import java.{ util => ju }
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import kafka.common.TopicAndPartition
--- End diff --

These are the older Kafka APIs right? this may all be correct, just making 
sure these are the classes that are needed in a Kafka 0.10 test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169490896
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
 s"skipping ${part.topic} ${part.partition}")
   Iterator.empty
 } else {
-  new KafkaRDDIterator(part, context)
+  logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
+s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+  if (compacted) {
+new CompactedKafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  } else {
+new KafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  }
 }
   }
+}
 
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
-   * Uses a cached consumer where possible to take advantage of prefetching
-   */
-  private class KafkaRDDIterator(
-  part: KafkaRDDPartition,
-  context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
-logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
-  s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-
-context.addTaskCompletionListener{ context => closeIfNeeded() }
-
-val consumer = if (useConsumerCache) {
-  CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
-  if (context.attemptNumber >= 1) {
-// just in case the prior attempt failures were cache related
-CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
-  }
-  CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
-} else {
-  CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+  part: KafkaRDDPartition,
+  context: TaskContext,
+  kafkaParams: ju.Map[String, Object],
+  useConsumerCache: Boolean,
+  pollTimeout: Long,
+  cacheInitialCapacity: Int,
+  cacheMaxCapacity: Int,
+  cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+  val consumer = if (useConsumerCache) {
+CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
+if (context.attemptNumber >= 1) {
+  // just in case the prior attempt failures were cache related
+  CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
 }
+CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
+  } else {
+CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+  }
 
-var requestOffset = part.fromOffset
+  var requestOffset = part.fromOffset
 
-def closeIfNeeded(): Unit = {
-  if (!useConsumerCache && consumer != null) {
-consumer.close
-  }
+  def closeIfNeeded(): Unit = {
+if (!useConsumerCache && consumer != null) {
+  consumer.close
 }
+  }
+
+  override def hasNext(): Boolean = requestOffset < part.untilOffset
 
-override def hasNext(): Boolean = requestOffset < part.untilOffset
+  override def next(): ConsumerRecord[K, V] = {
+assert(hasNext(), "Can't call getNext() once untilOffset has been 
reached")
--- End diff --

This seems like something that shouldn't be an assert because a caller 
could call `next()` out of order. Should be an exception, to be sure. 
`NoSuchElementException`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169491286
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010.mocks
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.PriorityQueue
+
+import kafka.utils.{Scheduler, Time}
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time 
instance.
+ * Tasks are executed synchronously when the time is advanced.
+ * This class is meant to be used in conjunction with MockTime.
+ *
+ * Example usage
+ * 
+ *   val time = new MockTime
+ *   time.scheduler.schedule("a task", println("hello world: " + 
time.milliseconds), delay = 1000)
+ *   time.sleep(1001) // this should cause our scheduled task to fire
+ * 
+ *
+ * Incrementing the time to the exact next execution time of a task will 
result in that task
+ * executing (it as if execution itself takes no time).
+ */
+private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
+
+  /* a priority queue of tasks ordered by next execution time */
+  var tasks = new PriorityQueue[MockTask]()
+
+  def isStarted: Boolean = true
+
+  def startup() {}
+
+  def shutdown() {
--- End diff --

Just a style question or nit, but, what about: `def shutdown(): Unit = 
synchronized { ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169490350
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
 s"skipping ${part.topic} ${part.partition}")
   Iterator.empty
 } else {
-  new KafkaRDDIterator(part, context)
+  logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
+s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+  if (compacted) {
+new CompactedKafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  } else {
+new KafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  }
 }
   }
+}
 
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
-   * Uses a cached consumer where possible to take advantage of prefetching
-   */
-  private class KafkaRDDIterator(
-  part: KafkaRDDPartition,
-  context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
-logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
-  s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-
-context.addTaskCompletionListener{ context => closeIfNeeded() }
-
-val consumer = if (useConsumerCache) {
-  CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
-  if (context.attemptNumber >= 1) {
-// just in case the prior attempt failures were cache related
-CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
-  }
-  CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
-} else {
-  CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+  part: KafkaRDDPartition,
+  context: TaskContext,
+  kafkaParams: ju.Map[String, Object],
+  useConsumerCache: Boolean,
+  pollTimeout: Long,
+  cacheInitialCapacity: Int,
+  cacheMaxCapacity: Int,
+  cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+  val consumer = if (useConsumerCache) {
+CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
+if (context.attemptNumber >= 1) {
+  // just in case the prior attempt failures were cache related
+  CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
 }
+CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
+  } else {
+CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
+  }
 
-var requestOffset = part.fromOffset
+  var requestOffset = part.fromOffset
 
-def closeIfNeeded(): Unit = {
-  if (!useConsumerCache && consumer != null) {
-consumer.close
-  }
+  def closeIfNeeded(): Unit = {
+if (!useConsumerCache && consumer != null) {
+  consumer.close
--- End diff --

Just nits here, but I'd write `close()` as it clearly has side effects


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-20 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r169489226
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
   assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
+assert(buffer.hasNext(),
--- End diff --

Should this really be an assert, like, should never happen regardless of 
input or external state? or just a condition that should generate an exception 
if false?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-10 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/20572

[SPARK-17147][STREAMING][KAFKA] Allow non-consecutive offsets

## What changes were proposed in this pull request?

Add a configuration spark.streaming.kafka.allowNonConsecutiveOffsets to 
allow streaming jobs to proceed on compacted topics (or other situations 
involving gaps between offsets in the log).

## How was this patch tested?

Added new unit test

@justinrmiller has been testing this branch in production for a few weeks

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-17147

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20572


commit 3082de7e43e8c381dc2227005d1e0fc5bd2c3d29
Author: cody koeninger 
Date:   2016-10-08T21:21:48Z

[SPARK-17147][STREAMING][KAFKA] failing test for compacted topics

commit e8ea89ea10527c6723df4af2685004ea67d872cd
Author: cody koeninger 
Date:   2016-10-09T04:59:39Z

[SPARK-17147][STREAMING][KAFKA] test passing for compacted topics

commit 182943e36f596d0cb5841a9c63471bea1dd9047b
Author: cody koeninger 
Date:   2018-02-11T04:09:38Z

spark.streaming.kafka.allowNonConsecutiveOffsets

commit 89f4bc5f4de78cdcc22b5c9b26a27ee9263048c8
Author: cody koeninger 
Date:   2018-02-11T04:13:49Z

[SPARK-17147][STREAMING][KAFKA] remove stray param doc

commit 12e65bedddbcd2407598e69fa3c6fcbcdfc67e5d
Author: cody koeninger 
Date:   2018-02-11T04:28:22Z

[SPARK-17147][STREAMING][KAFKA] prepare for merge of master

commit 2ed51f1f73ee75ffd08355265a72e68e83ef592d
Author: cody koeninger 
Date:   2018-02-11T05:19:31Z

Merge branch 'master' into SPARK-17147




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org