sadikovi commented on code in PR #38064:
URL: https://github.com/apache/spark/pull/38064#discussion_r1001233230


##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -54,9 +55,16 @@ private[spark] class ChunkedByteBuffer(var chunks: 
Array[ByteBuffer]) {
   private[this] var disposed: Boolean = false
 
   /**
-   * This size of this buffer, in bytes.
+   * This size of this buffer, in bytes. Using var here for serialization 
purpose(need to set a

Review Comment:
   nit: space before paren.



##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala:
##########
@@ -2181,6 +2184,29 @@ class DatasetSuite extends QueryTest
   }
 }
 
+class DatasetLargeResultCollectingSuite extends QueryTest
+  with SharedSparkSession {
+
+  override protected def sparkConf: SparkConf = 
super.sparkConf.set(MAX_RESULT_SIZE.key, "4g")
+  test("collect data with single partition larger than 2GB bytes array limit") 
{
+    import org.apache.spark.sql.functions.udf
+
+    val genData = udf((id: Long, bytesSize: Int) => {
+      val rand = new Random(id)
+      val arr = new Array[Byte](bytesSize)
+      rand.nextBytes(arr)
+      arr
+    })
+
+    spark.udf.register("genData", genData.asNondeterministic())
+    // create data of size ~3GB in single partition, which exceeds the byte 
array limit
+    // random gen to make sure it's poorly compressed
+    val df = spark.range(0, 3000, 1, 1).selectExpr("id", s"genData(id, 
1000000) as data")
+    val res = df.queryExecution.executedPlan.executeCollect()
+  }
+

Review Comment:
   nit: no new line.



##########
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala:
##########
@@ -2181,6 +2184,29 @@ class DatasetSuite extends QueryTest
   }
 }
 
+class DatasetLargeResultCollectingSuite extends QueryTest
+  with SharedSparkSession {
+
+  override protected def sparkConf: SparkConf = 
super.sparkConf.set(MAX_RESULT_SIZE.key, "4g")
+  test("collect data with single partition larger than 2GB bytes array limit") 
{

Review Comment:
   nit: new line.



##########
core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
+
+private[spark] object SerializerHelper extends Logging {
+
+  /**
+   * @param estimatedSize estimated size of `t`, used as a hint to choose 
proper chunk size

Review Comment:
   Can you extend the javadoc? There are 3 arguments.



##########
core/src/main/scala/org/apache/spark/serializer/SerializerHelper.scala:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
+
+private[spark] object SerializerHelper extends Logging {
+
+  /**
+   * @param estimatedSize estimated size of `t`, used as a hint to choose 
proper chunk size
+   */
+  def serializeToChunkedBuffer[T: ClassTag](
+      serializerInstance: SerializerInstance,
+      t: T,

Review Comment:
   This is not an API override, so it would be good to update.



##########
core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala:
##########
@@ -207,6 +267,18 @@ private[spark] object ChunkedByteBuffer {
     }
     out.toChunkedByteBuffer
   }
+
+  /**
+   * Try to estimate appropriate chunk size so that it's not too large(waste 
memory) or too
+   * small(too many segments)
+   */
+  def estimateBufferChunkSize(estimatedSize: Long = -1): Int = {
+    if (estimatedSize < 0) {
+      CHUNK_BUFFER_SIZE
+    } else {
+      Math.max(Math.min(estimatedSize >> 3, CHUNK_BUFFER_SIZE).toInt, 
MINIMUM_CHUNK_BUFFER_SIZE)

Review Comment:
   I don't think it is even worth it. You will only select a different from 
CHUNK_BUFFER_SIZE size when it is between CHUNK_BUFFER_SIZE and 
MINIMUM_CHUNK_BUFFER_SIZE and in that case you can choose either anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to