[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15527 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85265049 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -121,16 +124,61 @@ private[kafka010] case class KafkaSource( }.partitionToOffsets } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None + override def schema: StructType = KafkaSource.kafkaSchema /** Returns the maximum available offset for this source. */ override def getOffset: Option[Offset] = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets -val offset = KafkaSourceOffset(fetchLatestOffsets()) -logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") -Some(offset) +val latest = fetchLatestOffsets() +val offsets = maxOffsetsPerTrigger match { + case None => +latest + case Some(limit) if currentPartitionOffsets.isEmpty => +rateLimit(limit, initialPartitionOffsets, latest) + case Some(limit) => +rateLimit(limit, currentPartitionOffsets.get, latest) +} + +currentPartitionOffsets = Some(offsets) +logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") +Some(KafkaSourceOffset(offsets)) + } + + /** Proportionally distribute limit number of offsets among topicpartitions */ + private def rateLimit( + limit: Long, + from: Map[TopicPartition, Long], + until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { +val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) +val sizes = until.flatMap { case (tp, end) => +// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it --- End diff -- nit: use 2 spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85265053 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -121,16 +124,61 @@ private[kafka010] case class KafkaSource( }.partitionToOffsets } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None + override def schema: StructType = KafkaSource.kafkaSchema /** Returns the maximum available offset for this source. */ override def getOffset: Option[Offset] = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets -val offset = KafkaSourceOffset(fetchLatestOffsets()) -logDebug(s"GetOffset: ${offset.partitionToOffsets.toSeq.map(_.toString).sorted}") -Some(offset) +val latest = fetchLatestOffsets() +val offsets = maxOffsetsPerTrigger match { + case None => +latest + case Some(limit) if currentPartitionOffsets.isEmpty => +rateLimit(limit, initialPartitionOffsets, latest) + case Some(limit) => +rateLimit(limit, currentPartitionOffsets.get, latest) +} + +currentPartitionOffsets = Some(offsets) +logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") +Some(KafkaSourceOffset(offsets)) + } + + /** Proportionally distribute limit number of offsets among topicpartitions */ + private def rateLimit( + limit: Long, + from: Map[TopicPartition, Long], + until: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { +val fromNew = fetchNewPartitionEarliestOffsets(until.keySet.diff(from.keySet).toSeq) +val sizes = until.flatMap { case (tp, end) => +// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it +from.get(tp).orElse(fromNew.get(tp)).flatMap { begin => + val size = end - begin + logDebug(s"rateLimit $tp size is $size") + if (size > 0) Some(tp -> size) else None +} +} +val total = sizes.values.sum.toDouble +if (total < 1) { + until +} else { + until.map { case (tp, end) => + tp -> sizes.get(tp).map { size => --- End diff -- nit: use 2 spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85258005 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource( --- End diff -- `currentPartitionOffsets` is the last processed offsets. Right? When recovering from the failure, `getBatch` will be called firstly, then `getOffset`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85258039 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -133,6 +132,41 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + test("maxOffsetsPerTrigger") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 3) +testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) +testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) +testUtils.sendMessages(topic, Array("1"), Some(2)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 10) + .option("subscribe", topic) + .option("startingOffsets", "earliest") +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock +testStream(mapped)( + StartStream(ProcessingTime(100), clock), + AdvanceManualClock(100), + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), --- End diff -- FYI, #14553 got merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85253453 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource( --- End diff -- Shouldn't it be set to the highest available offset in the streaming metadata log, not the highest available offset in kafka? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85246059 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -133,6 +132,41 @@ class KafkaSourceSuite extends KafkaSourceTest { private val topicId = new AtomicInteger(0) + test("maxOffsetsPerTrigger") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 3) +testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) +testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) +testUtils.sendMessages(topic, Array("1"), Some(2)) + +val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 10) + .option("subscribe", topic) + .option("startingOffsets", "earliest") +val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] +val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + +val clock = new StreamManualClock +testStream(mapped)( + StartStream(ProcessingTime(100), clock), + AdvanceManualClock(100), + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), --- End diff -- There is a race condition here. The batch may be still running. I figured out the following codes to cover the recovery and fix the race condition finally. ```Scala test("maxOffsetsPerTrigger") { val topic = newTopic() testUtils.createTopic(topic, partitions = 3) testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, Array("1"), Some(2)) val reader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") .option("maxOffsetsPerTrigger", 10) .option("subscribe", topic) .option("startingOffsets", "earliest") val kafka = reader.load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) val clock = new StreamManualClock val waitUntilBatchProcessed = AssertOnQuery { q => eventually(Timeout(streamingTimeout)) { if (!q.exception.isDefined) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } if (q.exception.isDefined) { throw q.exception.get } true } testStream(mapped)( StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, // 1 from smallest, 1 from middle, 8 from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), AdvanceManualClock(100), waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 ), StopStream, StartStream(ProcessingTime(100), clock), waitUntilBatchProcessed, AdvanceManualClock(100), waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, 12, 117, 118, 119, 120, 121, 122, 123, 124, 125 ), AdvanceManualClock(100), waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 ) ) } ``` This test fails now because of an issue being fixed in #14553. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/15527#discussion_r85245467 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -153,11 +201,7 @@ private[kafka010] case class KafkaSource( --- End diff -- `currentPartitionOffsets` should be set to `untilPartitionOffsets` if it's empty. It means recovery from a failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15527: [SPARK-17813][SQL][KAFKA] Maximum data per trigge...
GitHub user koeninger opened a pull request: https://github.com/apache/spark/pull/15527 [SPARK-17813][SQL][KAFKA] Maximum data per trigger ## What changes were proposed in this pull request? maxOffsetsPerTrigger option for rate limiting, proportionally based on volume of different topicpartitions. This is assuming SPARK-17812 is merged first due to common changes in test utils, if that ends up not being the case I can clean this up as a separate patch. ## How was this patch tested? Added unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/koeninger/spark-1 SPARK-17813 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15527.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15527 commit c45ded7109474fcb40f03c772192eb38398f328a Author: cody koeningerDate: 2016-10-14T04:23:02Z [SPARK-17812][SQL][KAFKA] parse json for topicpartitions and offsets commit 12d3988c4fcef9bbbd88ce69295d2ff3e5baa5ba Author: cody koeninger Date: 2016-10-14T19:58:08Z Merge branch 'master' into SPARK-17812 commit 3120fd8ade24140777c29fc1487aa3f6e76152fb Author: cody koeninger Date: 2016-10-14T21:37:35Z [SPARK-17812][SQL][KAFKA] implement specified offsets and assign commit 35bb8c3cfe77f2cb3d26f4afd3364caa6d0ec4cf Author: cody koeninger Date: 2016-10-16T03:00:20Z [SPARK-17812][SQL][KAFKA] doc and test updates commit 2e53e5a3904305cb1d1b0f2325e31c9c434d16ec Author: cody koeninger Date: 2016-10-16T03:16:11Z [SPARK-17812][SQL][KAFKA] style fixes commit 5e4511f0c7e84d15011a7eb8d208be13ed672b49 Author: cody koeninger Date: 2016-10-16T03:52:39Z [SPARK-17812][SQL][KAFKA] additional paranoia on reset of starting offsets commit cae967cb88a7682b6794d5d2ef90a0d9a1d3ea60 Author: cody koeninger Date: 2016-10-18T03:14:31Z Merge branch 'SPARK-17812' into SPARK-17813 Testing maxOffsetsPerTrigger requires the per-partition sendMessages testing added in SPARK-17812 commit 6c8d459f9795c6ff32e8bf78f8796869ca722ee3 Author: cody koeninger Date: 2016-10-18T05:20:53Z [SPARK-17813][SQL][KAFKA] maxOffsetsPerTrigger proportional implementation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org