[GitHub] [spark] HeartSaVioR commented on a change in pull request #32653: [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-06-08 Thread GitBox


HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r647159789



##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##
@@ -95,15 +114,65 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
 val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
-endPartitionOffsets = KafkaSourceOffset(readLimit match {
-  case rows: ReadMaxRows =>
-rateLimit(rows.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
-  case _: ReadAllAvailable =>
-latestPartitionOffsets
-})
+
+val limits: Seq[ReadLimit] = readLimit match {
+  case rows: CompositeReadLimit => rows.getReadLimits
+  case rows => Seq(rows)
+}
+
+val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) {
+  // ReadAllAvailable has the highest priority
+  latestPartitionOffsets
+} else {
+  val lowerLimit = 
limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows])
+  val upperLimit = 
limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows])
+
+  lowerLimit.flatMap { limit =>
+// checking if we need to skip batch based on minOffsetPerTrigger 
criteria
+val skipBatch = delayBatch(
+  limit.minRows, latestPartitionOffsets, startPartitionOffsets, 
limit.maxTriggerDelayMs)
+if (skipBatch) {
+  logDebug(
+s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
+  Some(startPartitionOffsets)
+} else {
+  None
+}
+  }.orElse {
+// checking if we need to adjust a range of offsets based on 
maxOffsetPerTrigger criteria
+upperLimit.map { limit =>
+  rateLimit(limit.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
+}
+  }.getOrElse(latestPartitionOffsets)
+}
+
+endPartitionOffsets = KafkaSourceOffset(offsets)
 endPartitionOffsets
   }
 
+  /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
+  private def delayBatch(
+  minLimit: Long,
+  latestOffsets: Map[TopicPartition, Long],
+  currentOffsets: Map[TopicPartition, Long],
+  maxTriggerDelayMs: Long): Boolean = {
+// Checking first if the maxbatchDelay time has passed

Review comment:
   nit: It won't hurt if we only call `System.currentTimeMillis()` once and 
reuse it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #32653: [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-06-01 Thread GitBox


HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642793811



##
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Arrays;
+
+/**
+ /**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately
+ * given maximum number of rows with at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2

Review comment:
   The target version will be 3.2.0 in best case. We don't add a new 
functionality in bugfix versions.

##
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 }
   }
 
+  test("minOffsetsPerTrigger") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+testUtils.sendMessages(topic, Array("1"), Some(2))
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("minOffsetsPerTrigger", 15)
+  .option("maxTriggerDelay", "10s")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+val kafka = reader.load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .as[(String, String)]
+val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
+
+val clock = new StreamManualClock
+
+val waitUntilBatchProcessed = AssertOnQuery { q =>
+  eventually(Timeout(streamingTimeout)) {
+if (!q.exception.isDefined) {
+  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+}
+  }
+  if (q.exception.isDefined) {
+throw q.exception.get
+  }
+  true
+}
+testStream(mapped)(
+  StartStream(Trigger.ProcessingTime(100), clock),
+  waitUntilBatchProcessed,
+  // First Batch is always processed
+  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
+11, 108, 109, 12, 13, 14),
+  // Adding more data but less than minOffsetsPerTrigger
+  Assert {
+testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, 
Some(1))
+true
+  },
+  // No data is processed for next batch as data is less than 
minOffsetsPerTrigger
+  // and maxTriggerDelay is not expired
+  AdvanceManualClock(100),
+  waitUntilBatchProcessed,
+  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,

Review comment:
   Please use CheckNewAnswer here and below to assert only new output if 
that works for the test.
   
   If CheckNewAnswer works without waitUntilBatchProcessed, even better.

##
File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##
@@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 }
   }
 
+  test("minOffsetsPerTrigger") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1))
+testUtils.sendMessages(topic, Array("1"), Some(2))
+
+val reader = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("minOffsetsPerTrigger", 15)
+  .option("maxTriggerDelay", "10s")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+val kafka = 

[GitHub] [spark] HeartSaVioR commented on a change in pull request #32653: [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-05-31 Thread GitBox


HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642792749



##
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Objects;
+
+/**
+ /**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately
+ * given maximum number of rows with at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2
+ */
+@Evolving
+public final class CompositeReadLimit implements ReadLimit {

Review comment:
   Looks to be addressed. Thanks :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #32653: [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-05-31 Thread GitBox


HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642765673



##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
##
@@ -139,26 +156,78 @@ private[kafka010] class KafkaSource(
   override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): 
streaming.Offset = {
 // Make sure initialPartitionOffsets is initialized
 initialPartitionOffsets
-
-val latest = kafkaReader.fetchLatestOffsets(
-  currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
+val currentOffsets = 
currentPartitionOffsets.orElse(Some(initialPartitionOffsets))
+val latest = kafkaReader.fetchLatestOffsets(currentOffsets)
+var skipBatch = false

Review comment:
   Same here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #32653: [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-05-31 Thread GitBox


HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r642765440



##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##
@@ -95,15 +114,62 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
 val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+var skipBatch = false

Review comment:
   Now I see duplicated codes around due to branches handling each type, 
including CompositeReadLimit which handles both lower and upper hence having 
same code.
   
   How about changing like below:
   
   ```
   val limits: Seq[ReadLimit] = readLimit match {
 case rows: CompositeReadLimit => rows.getReadLimits
 case rows => Seq(rows)
   }
   
   val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) {
 // ReadAllAvailable has the highest priority
 latestPartitionOffsets
   } else {
 val lowerLimit = 
limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows])
 val upperLimit = 
limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows])
   
 lowerLimit.flatMap { limit =>
   // checking if we need to skip batch based on minOffsetPerTrigger 
criteria
   val skipBatch = delayBatch(
 limit.minRows, latestPartitionOffsets, startPartitionOffsets, 
limit.maxTriggerDelayMs)
   if (skipBatch) {
 logDebug(
   s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
 Some(startPartitionOffsets)
   } else {
 None
   }
 }.orElse {
   // checking if we need to adjust a range of offsets based on 
maxOffsetPerTrigger criteria
   upperLimit.map { limit =>
 rateLimit(limit.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
   }
 }.getOrElse(latestPartitionOffsets)
   }
   
   endPartitionOffsets = KafkaSourceOffset(offsets)
   endPartitionOffsets
   ```
   
   this would require less change when we want to add more read limits in the 
future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #32653: [SPARK-35312][SS] Introduce new Option in Kafka source to specify minimum number of records to read per trigger

2021-05-24 Thread GitBox


HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r638448022



##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##
@@ -64,6 +66,15 @@ private[kafka010] class KafkaMicroBatchStream(
   private[kafka010] val maxOffsetsPerTrigger = Option(options.get(
 KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)).map(_.toLong)
 
+  private[kafka010] val minOffsetPerTrigger = Option(options.get(
+KafkaSourceProvider.MIN_OFFSET_PER_TRIGGER)).map(_.toLong)
+
+  private[kafka010] val maxTriggerDelayMs =

Review comment:
   I'd like to see this as a part of `ReadMinRows`, so that other data 
source implementations leveraging this should also respect the maximum wait 
time. But let's hear others' voices as well.

##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
##
@@ -524,6 +528,9 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
   private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
   private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
+  private[kafka010] val MIN_OFFSET_PER_TRIGGER = "minOffsetsPerTrigger"

Review comment:
   We use all lowercases here to represent the fact the option name is case 
insensitive.

##
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Objects;
+
+/**
+ /**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should 
scan approximately
+ * given maximum number of rows with at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2
+ */
+@Evolving
+public final class CompositeReadLimit implements ReadLimit {

Review comment:
   I'd like to see this be general container of ReadLimits containing 
`ReadLimit[]` or `List`, instead of sticking with minRows and 
maxRows. e.g. Once we want to do the same for ReadMinFiles we will have to have 
another class.

##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
##
@@ -87,9 +88,17 @@ private[kafka010] class KafkaSource(
   private val maxOffsetsPerTrigger =
 sourceOptions.get(MAX_OFFSET_PER_TRIGGER).map(_.toLong)
 
+  private[kafka010] val minOffsetPerTrigger =
+sourceOptions.get(MIN_OFFSET_PER_TRIGGER).map(_.toLong)
+
+  private[kafka010] val maxTriggerDelayMs =

Review comment:
   Same here.

##
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##
@@ -95,15 +112,51 @@ private[kafka010] class KafkaMicroBatchStream(
   override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
 val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
 latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+var skipBatch = false
 endPartitionOffsets = KafkaSourceOffset(readLimit match {
+  case rows: ReadMinRows =>
+// checking if we need to skip batch based on minOffsetPerTrigger 
criteria
+skipBatch = delayBatch(rows.minRows(), latestPartitionOffsets, 
startPartitionOffsets)
+if (skipBatch) {
+  logDebug(
+s"Delaying batch as number of records available is less than 
minOffsetsPerTrigger")
+  startPartitionOffsets
+} else latestPartitionOffsets
   case rows: ReadMaxRows =>
 rateLimit(rows.maxRows(), startPartitionOffsets, 
latestPartitionOffsets)
+  case rows: CompositeReadLimit =>
+skipBatch = delayBatch(rows.minRows(), latestPartitionOffsets, 
startPartitionOffsets)
+if (skipBatch) {
+  logDebug(
+s"Delaying batch as number of records available is less than