[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15527


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85265049
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -121,16 +124,61 @@ private[kafka010] case class KafkaSource(
 }.partitionToOffsets
   }
 
+  private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = 
None
+
   override def schema: StructType = KafkaSource.kafkaSchema
 
   /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = {
 // Make sure initialPartitionOffsets is initialized
 initialPartitionOffsets
 
-val offset = KafkaSourceOffset(fetchLatestOffsets())
-logDebug(s"GetOffset: 
${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
-Some(offset)
+val latest = fetchLatestOffsets()
+val offsets = maxOffsetsPerTrigger match {
+  case None =>
+latest
+  case Some(limit) if currentPartitionOffsets.isEmpty =>
+rateLimit(limit, initialPartitionOffsets, latest)
+  case Some(limit) =>
+rateLimit(limit, currentPartitionOffsets.get, latest)
+}
+
+currentPartitionOffsets = Some(offsets)
+logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
+Some(KafkaSourceOffset(offsets))
+  }
+
+  /** Proportionally distribute limit number of offsets among 
topicpartitions */
+  private def rateLimit(
+  limit: Long,
+  from: Map[TopicPartition, Long],
+  until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+val fromNew = 
fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+val sizes = until.flatMap { case (tp, end) =>
+// If begin isn't defined, something's wrong, but let alert logic 
in getBatch handle it
--- End diff --

nit: use 2 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85265053
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -121,16 +124,61 @@ private[kafka010] case class KafkaSource(
 }.partitionToOffsets
   }
 
+  private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = 
None
+
   override def schema: StructType = KafkaSource.kafkaSchema
 
   /** Returns the maximum available offset for this source. */
   override def getOffset: Option[Offset] = {
 // Make sure initialPartitionOffsets is initialized
 initialPartitionOffsets
 
-val offset = KafkaSourceOffset(fetchLatestOffsets())
-logDebug(s"GetOffset: 
${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
-Some(offset)
+val latest = fetchLatestOffsets()
+val offsets = maxOffsetsPerTrigger match {
+  case None =>
+latest
+  case Some(limit) if currentPartitionOffsets.isEmpty =>
+rateLimit(limit, initialPartitionOffsets, latest)
+  case Some(limit) =>
+rateLimit(limit, currentPartitionOffsets.get, latest)
+}
+
+currentPartitionOffsets = Some(offsets)
+logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
+Some(KafkaSourceOffset(offsets))
+  }
+
+  /** Proportionally distribute limit number of offsets among 
topicpartitions */
+  private def rateLimit(
+  limit: Long,
+  from: Map[TopicPartition, Long],
+  until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
+val fromNew = 
fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+val sizes = until.flatMap { case (tp, end) =>
+// If begin isn't defined, something's wrong, but let alert logic 
in getBatch handle it
+from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+  val size = end - begin
+  logDebug(s"rateLimit $tp size is $size")
+  if (size > 0) Some(tp -> size) else None
+}
+}
+val total = sizes.values.sum.toDouble
+if (total < 1) {
+  until
+} else {
+  until.map { case (tp, end) =>
+  tp -> sizes.get(tp).map { size =>
--- End diff --

nit: use 2 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85258005
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource(
 
--- End diff --

`currentPartitionOffsets` is the last processed offsets. Right? When 
recovering from the failure, `getBatch` will be called firstly, then 
`getOffset`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85258039
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -133,6 +132,41 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private val topicId = new AtomicInteger(0)
 
+  test("maxOffsetsPerTrigger") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("1"), Some(2))
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("maxOffsetsPerTrigger", 10)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+testStream(mapped)(
+  StartStream(ProcessingTime(100), clock),
+  AdvanceManualClock(100),
+  // 1 from smallest, 1 from middle, 8 from biggest
+  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
--- End diff --

FYI, #14553 got merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85253453
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource(
 
--- End diff --

Shouldn't it be set to the highest available offset in the streaming 
metadata log, not the highest available offset in kafka?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85246059
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -133,6 +132,41 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
   private val topicId = new AtomicInteger(0)
 
+  test("maxOffsetsPerTrigger") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("1"), Some(2))
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("maxOffsetsPerTrigger", 10)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
+
+val clock = new StreamManualClock
+testStream(mapped)(
+  StartStream(ProcessingTime(100), clock),
+  AdvanceManualClock(100),
+  // 1 from smallest, 1 from middle, 8 from biggest
+  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
--- End diff --

There is a race condition here. The batch may be still running. I figured 
out the following codes to cover the recovery and fix the race condition 
finally.

```Scala
  test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, 
Some(1))
testUtils.sendMessages(topic, Array("1"), Some(2))

val reader = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
  .option("kafka.metadata.max.age.ms", "1")
  .option("maxOffsetsPerTrigger", 10)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
val kafka = reader.load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)

val clock = new StreamManualClock

val waitUntilBatchProcessed = AssertOnQuery { q =>
  eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
  }
  if (q.exception.isDefined) {
throw q.exception.get
  }
  true
}

testStream(mapped)(
  StartStream(ProcessingTime(100), clock),
  waitUntilBatchProcessed,
  // 1 from smallest, 1 from middle, 8 from biggest
  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
  AdvanceManualClock(100),
  waitUntilBatchProcessed,
  // smallest now empty, 1 more from middle, 9 more from biggest
  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
  ),
  StopStream,
  StartStream(ProcessingTime(100), clock),
  waitUntilBatchProcessed,
  AdvanceManualClock(100),
  waitUntilBatchProcessed,
  // smallest now empty, 1 more from middle, 9 more from biggest
  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125
  ),
  AdvanceManualClock(100),
  waitUntilBatchProcessed,
  // smallest now empty, 1 more from middle, 9 more from biggest
  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
  )
)
  }
```

This test fails now because of an issue being fixed in #14553.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/15527#discussion_r85245467
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource(
 
--- End diff --

`currentPartitionOffsets` should be set to `untilPartitionOffsets` if it's 
empty. It means recovery from a failure.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...

2016-10-17 Thread koeninger
GitHub user koeninger opened a pull request:

https://github.com/apache/spark/pull/15527

[SPARK-17813][SQL][KAFKA] Maximum data per trigger

## What changes were proposed in this pull request?

maxOffsetsPerTrigger option for rate limiting, proportionally based on 
volume of different topicpartitions.

This is assuming SPARK-17812 is merged first due to common changes in test 
utils, if that ends up not being the case I can clean this up as a separate 
patch.

## How was this patch tested?

Added unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/koeninger/spark-1 SPARK-17813

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15527.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15527


commit c45ded7109474fcb40f03c772192eb38398f328a
Author: cody koeninger 
Date:   2016-10-14T04:23:02Z

[SPARK-17812][SQL][KAFKA] parse json for topicpartitions and offsets

commit 12d3988c4fcef9bbbd88ce69295d2ff3e5baa5ba
Author: cody koeninger 
Date:   2016-10-14T19:58:08Z

Merge branch 'master' into SPARK-17812

commit 3120fd8ade24140777c29fc1487aa3f6e76152fb
Author: cody koeninger 
Date:   2016-10-14T21:37:35Z

[SPARK-17812][SQL][KAFKA] implement specified offsets and assign

commit 35bb8c3cfe77f2cb3d26f4afd3364caa6d0ec4cf
Author: cody koeninger 
Date:   2016-10-16T03:00:20Z

[SPARK-17812][SQL][KAFKA] doc and test updates

commit 2e53e5a3904305cb1d1b0f2325e31c9c434d16ec
Author: cody koeninger 
Date:   2016-10-16T03:16:11Z

[SPARK-17812][SQL][KAFKA] style fixes

commit 5e4511f0c7e84d15011a7eb8d208be13ed672b49
Author: cody koeninger 
Date:   2016-10-16T03:52:39Z

[SPARK-17812][SQL][KAFKA] additional paranoia on reset of starting offsets

commit cae967cb88a7682b6794d5d2ef90a0d9a1d3ea60
Author: cody koeninger 
Date:   2016-10-18T03:14:31Z

Merge branch 'SPARK-17812' into SPARK-17813

Testing maxOffsetsPerTrigger requires the per-partition sendMessages 
testing added in SPARK-17812

commit 6c8d459f9795c6ff32e8bf78f8796869ca722ee3
Author: cody koeninger 
Date:   2016-10-18T05:20:53Z

[SPARK-17813][SQL][KAFKA] maxOffsetsPerTrigger proportional implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org