This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 6e7e50eaaf7 [SPARK-44657][CONNECT] Fix incorrect limit handling in 
ArrowBatchWithSchemaIterator and config parsing of 
CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
6e7e50eaaf7 is described below

commit 6e7e50eaaf7770d46474b5299ccf8724018f526a
Author: vicennial <venkata.gud...@databricks.com>
AuthorDate: Tue Aug 8 17:30:16 2023 +0900

    [SPARK-44657][CONNECT] Fix incorrect limit handling in 
ArrowBatchWithSchemaIterator and config parsing of 
CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
    
    Fixes the limit checking of `maxEstimatedBatchSize` and 
`maxRecordsPerBatch` to respect the more restrictive limit and fixes the config 
parsing of `CONNECT_GRPC_ARROW_MAX_BATCH_SIZE` by converting the value to bytes.
    
    Bugfix.
    In the arrow writer 
[code](https://github.com/apache/spark/blob/6161bf44f40f8146ea4c115c788fd4eaeb128769/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala#L154-L163)
 , the conditions don’t seem to hold what the documentation says regd 
"maxBatchSize and maxRecordsPerBatch, respect whatever smaller" since it seems 
to actually respect the conf which is "larger" (i.e less restrictive) due to || 
operator.
    
    Further, when the `CONNECT_GRPC_ARROW_MAX_BATCH_SIZE` conf is read, the 
value is not converted to bytes from MiB 
([example](https://github.com/apache/spark/blob/3e5203c64c06cc8a8560dfa0fb6f52e74589b583/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L103)).
    
    No.
    
    Existing tests.
    
    Closes #42321 from vicennial/SPARK-44657.
    
    Authored-by: vicennial <venkata.gud...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit f9d417fc17a82ddf02d6bbab82abc8e1aa264356)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/sql/connect/config/Connect.scala   | 10 +++++-----
 .../spark/sql/execution/arrow/ArrowConverters.scala | 21 +++++++++++++--------
 2 files changed, 18 insertions(+), 13 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 19fdad97b5f..2a5290d3469 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -40,12 +40,12 @@ object Connect {
   val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
     ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
       .doc(
-        "When using Apache Arrow, limit the maximum size of one arrow batch 
that " +
-          "can be sent from server side to client side. Currently, we 
conservatively use 70% " +
-          "of it because the size is not accurate but estimated.")
+        "When using Apache Arrow, limit the maximum size of one arrow batch, 
in bytes unless " +
+          "otherwise specified, that can be sent from server side to client 
side. Currently, we " +
+          "conservatively use 70% of it because the size is not accurate but 
estimated.")
       .version("3.4.0")
-      .bytesConf(ByteUnit.MiB)
-      .createWithDefaultString("4m")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(4 * 1024 * 1024)
 
   val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
     ConfigBuilder("spark.connect.grpc.maxInboundMessageSize")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index b22c80d17e8..df26a06c86d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -145,17 +145,22 @@ private[sql] object ArrowConverters extends Logging {
         // Always write the schema.
         MessageSerializer.serialize(writeChannel, arrowSchema)
 
+        def isBatchSizeLimitExceeded: Boolean = {
+          // If `maxEstimatedBatchSize` is zero or negative, it implies 
unlimited.
+          maxEstimatedBatchSize > 0 && estimatedBatchSize >= 
maxEstimatedBatchSize
+        }
+        def isRecordLimitExceeded: Boolean = {
+          // If `maxRecordsPerBatch` is zero or negative, it implies unlimited.
+          maxRecordsPerBatch > 0 && rowCountInLastBatch >= maxRecordsPerBatch
+        }
         // Always write the first row.
         while (rowIter.hasNext && (
-          // For maxBatchSize and maxRecordsPerBatch, respect whatever smaller.
           // If the size in bytes is positive (set properly), always write the 
first row.
-          rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0 ||
-            // If the size in bytes of rows are 0 or negative, unlimit it.
-            estimatedBatchSize <= 0 ||
-            estimatedBatchSize < maxEstimatedBatchSize ||
-            // If the size of rows are 0 or negative, unlimit it.
-            maxRecordsPerBatch <= 0 ||
-            rowCountInLastBatch < maxRecordsPerBatch)) {
+          (rowCountInLastBatch == 0 && maxEstimatedBatchSize > 0) ||
+            // If either limit is hit, create a batch. This implies that the 
limit that is hit first
+            // triggers the creation of a batch even if the other limit is not 
yet hit, hence
+            // preferring the more restrictive limit.
+            (!isBatchSizeLimitExceeded && !isRecordLimitExceeded))) {
           val row = rowIter.next()
           arrowWriter.write(row)
           estimatedBatchSize += (row match {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to