[spark] branch master updated (7cbf7dd148d -> 24adac30539)

2022-11-11 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 7cbf7dd148d [SPARK-40372][SQL] Migrate failures of array type checks 
onto error classes
 add 24adac30539 [SPARK-41093][BUILD] Remove netty-tcnative-classes from 
Spark dependencyList

No new revisions were added by this update.

Summary of changes:
 common/network-common/pom.xml |  4 
 core/pom.xml  |  4 
 dev/deps/spark-deps-hadoop-2-hive-2.3 |  1 -
 dev/deps/spark-deps-hadoop-3-hive-2.3 |  1 -
 pom.xml   | 14 --
 5 files changed, 24 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4f614b3f699 -> 7cbf7dd148d)

2022-11-11 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4f614b3f699 [SPARK-41005][CONNECT][FOLLOWUP] Collect should use 
`submitJob` instead of `runJob`
 add 7cbf7dd148d [SPARK-40372][SQL] Migrate failures of array type checks 
onto error classes

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json   |  15 +-
 .../expressions/collectionOperations.scala | 203 +++---
 .../resources/sql-tests/results/ansi/array.sql.out |  20 +-
 .../resources/sql-tests/results/ansi/map.sql.out   |   4 +-
 .../test/resources/sql-tests/results/array.sql.out |  20 +-
 .../test/resources/sql-tests/results/map.sql.out   |   4 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 408 +++--
 7 files changed, 486 insertions(+), 188 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob`

2022-11-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4f614b3f699 [SPARK-41005][CONNECT][FOLLOWUP] Collect should use 
`submitJob` instead of `runJob`
4f614b3f699 is described below

commit 4f614b3f699d4d3924d4411c98a20d2e58b2e2e6
Author: Ruifeng Zheng 
AuthorDate: Fri Nov 11 21:18:09 2022 +0900

[SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of 
`runJob`

### What changes were proposed in this pull request?
use `submitJob` instead of `runJob`

### Why are the changes needed?
`spark.sparkContext.runJob` is blocked until finishes all partitions

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing Tests

Closes #38614 from zhengruifeng/connect_collect_submitJob.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/connect/service/SparkConnectStreamHandler.scala   | 15 +++
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index ffac330cd6d..55e091bd8d0 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -161,17 +161,24 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   ()
 }
 
-spark.sparkContext.runJob(batches, processPartition, resultHandler)
+spark.sparkContext.submitJob(
+  rdd = batches,
+  processPartition = processPartition,
+  partitions = Seq.range(0, numPartitions),
+  resultHandler = resultHandler,
+  resultFunc = () => ())
 
 // The man thread will wait until 0-th partition is available,
-// then send it to client and wait for next partition.
+// then send it to client and wait for the next partition.
 var currentPartitionId = 0
 while (currentPartitionId < numPartitions) {
   val partition = signal.synchronized {
-while (!partitions.contains(currentPartitionId)) {
+var result = partitions.remove(currentPartitionId)
+while (result.isEmpty) {
   signal.wait()
+  result = partitions.remove(currentPartitionId)
 }
-partitions.remove(currentPartitionId).get
+result.get
   }
 
   partition.foreach { case (bytes, count) =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case

2022-11-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 65530501bb2 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case
65530501bb2 is described below

commit 65530501bb23a1e94f5138d9be0ef962bf33de76
Author: Mridul 
AuthorDate: Fri Nov 11 19:58:13 2022 +0900

[SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case

### What changes were proposed in this pull request?
Fix flakey test failure

### Why are the changes needed?
MT-safety issue in test

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Local tests.
Need to validate on CI

Closes #38617 from mridulm/fix-test-failure.

Authored-by: Mridul 
Signed-off-by: Hyukjin Kwon 
---
 .../apache/spark/scheduler/DAGSchedulerSuite.scala| 19 ---
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f4e67eba40d..17abf3aef4e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.scheduler
 
-import java.util.Properties
+import java.util.{ArrayList => JArrayList, Collections => JCollections, 
Properties}
 import java.util.concurrent.{CountDownLatch, Delayed, ScheduledFuture, 
TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
 
 import scala.annotation.meta.param
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
@@ -4533,16 +4534,20 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
   blockStoreClientField.setAccessible(true)
   blockStoreClientField.set(sc.env.blockManager, blockStoreClient)
 
-  val sentHosts = ArrayBuffer[String]()
+  val sentHosts = JCollections.synchronizedList(new JArrayList[String]())
   var hostAInterrupted = false
   doAnswer { (invoke: InvocationOnMock) =>
 val host = invoke.getArgument[String](0)
-sendRequestsLatch.countDown()
 try {
   if (host == "hostA") {
-canSendRequestLatch.await(timeoutSecs * 2, TimeUnit.SECONDS)
+sendRequestsLatch.countDown()
+canSendRequestLatch.await(timeoutSecs * 5, TimeUnit.SECONDS)
+// should not reach here, will get interrupted by DAGScheduler
+sentHosts.add(host)
+  } else {
+sentHosts.add(host)
+sendRequestsLatch.countDown()
   }
-  sentHosts += host
 } catch {
   case _: InterruptedException => hostAInterrupted = true
 } finally {
@@ -4559,8 +4564,8 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
   sendRequestsLatch.await()
   verify(blockStoreClient, times(2))
 .finalizeShuffleMerge(any(), any(), any(), any(), any())
-  assert(sentHosts.nonEmpty)
-  assert(sentHosts.head === "hostB" && sentHosts.length == 1)
+  assert(1 == sentHosts.size())
+  assert(sentHosts.asScala.toSeq === Seq("hostB"))
   completeLatch.await()
   assert(hostAInterrupted)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate ArrowConverters codes

2022-11-11 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a45b0811ef2 [SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate 
ArrowConverters codes
a45b0811ef2 is described below

commit a45b0811ef22e2b66d52d066784e3fd2d9107f9d
Author: Hyukjin Kwon 
AuthorDate: Fri Nov 11 19:12:09 2022 +0900

[SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate ArrowConverters 
codes

### What changes were proposed in this pull request?

This PR is a followup of both https://github.com/apache/spark/pull/38468 
and https://github.com/apache/spark/pull/38612 that proposes to deduplicate 
codes in `ArrowConverters` by creating two classes `ArrowBatchIterator` and 
`ArrowBatchWithSchemaIterator`.  In addition, we reuse 
`ArrowBatchWithSchemaIterator` when creating an empty Arrow batch at 
`createEmptyArrowBatch`.

While I am here,
- I addressed my own comment at 
https://github.com/apache/spark/pull/38612#discussion_r1019838326
- Kept the support of both max rows and size. Max row size check was 
removed in https://github.com/apache/spark/pull/38612

### Why are the changes needed?

For better readability and maintenance.

### Does this PR introduce _any_ user-facing change?

No, both codes are not related.

### How was this patch tested?

This is refactoring so existing CI should cover.

Closes #38618 from HyukjinKwon/SPARK-41108-followup-1.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../service/SparkConnectStreamHandler.scala|   8 +-
 .../sql/execution/arrow/ArrowConverters.scala  | 230 ++---
 2 files changed, 115 insertions(+), 123 deletions(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 394d6477d73..ffac330cd6d 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -126,19 +126,23 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
   def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = {
 val spark = dataframe.sparkSession
 val schema = dataframe.schema
+val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch
 val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone
 
 SQLExecution.withNewExecutionId(dataframe.queryExecution, 
Some("collectArrow")) {
   val rows = dataframe.queryExecution.executedPlan.execute()
   val numPartitions = rows.getNumPartitions
+  // Conservatively sets it 70% because the size is not accurate but 
estimated.
+  val maxBatchSize = (MAX_BATCH_SIZE * 0.7).toLong
   var numSent = 0
 
   if (numPartitions > 0) {
 type Batch = (Array[Byte], Long)
 
 val batches = rows.mapPartitionsInternal { iter =>
-  ArrowConverters
-.toBatchWithSchemaIterator(iter, schema, MAX_BATCH_SIZE, 
timeZoneId)
+  val newIter = ArrowConverters
+.toBatchWithSchemaIterator(iter, schema, maxRecordsPerBatch, 
maxBatchSize, timeZoneId)
+  newIter.map { batch: Array[Byte] => (batch, 
newIter.rowCountInLastBatch) }
 }
 
 val signal = new Object
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index c233ac32c12..a60f7b5970d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -71,63 +71,125 @@ private[sql] class ArrowBatchStreamWriter(
 }
 
 private[sql] object ArrowConverters extends Logging {
-
-  /**
-   * Maps Iterator from InternalRow to serialized ArrowRecordBatches. Limit 
ArrowRecordBatch size
-   * in a batch by setting maxRecordsPerBatch or use 0 to fully consume 
rowIter.
-   */
-  private[sql] def toBatchIterator(
+  private[sql] class ArrowBatchIterator(
   rowIter: Iterator[InternalRow],
   schema: StructType,
-  maxRecordsPerBatch: Int,
+  maxRecordsPerBatch: Long,
   timeZoneId: String,
-  context: TaskContext): Iterator[Array[Byte]] = {
+  context: TaskContext) extends Iterator[Array[Byte]] {
 
-val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
-val allocator =
-  ArrowUtils.rootAllocator.newChildAllocator("toBatchIterator", 0, 
Long.MaxValue)
+protected val arrowSchema = 

[spark] branch master updated (c80de3b67ae -> ffb6a4628c1)

2022-11-11 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c80de3b67ae [SPARK-41044][SQL] Convert 
DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR
 add ffb6a4628c1 [SPARK-41110][CONNECT][PYTHON] Implement 
`DataFrame.sparkSession` in Python client

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/dataframe.py| 11 +++
 python/pyspark/sql/tests/connect/test_connect_basic.py |  3 +++
 2 files changed, 14 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41044][SQL] Convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR

2022-11-11 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c80de3b67ae [SPARK-41044][SQL] Convert 
DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR
c80de3b67ae is described below

commit c80de3b67ae05d8c17d9afef9655ad2e76bfd05f
Author: panbingkun 
AuthorDate: Fri Nov 11 12:27:28 2022 +0300

[SPARK-41044][SQL] Convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to 
INTERNAL_ERROR

### What changes were proposed in this pull request?
The pr aims to convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to 
INTERNAL_ERROR.

### Why are the changes needed?
1. When I work on https://issues.apache.org/jira/browse/SPARK-41021, I 
can't found the path to trigger it from the user's perspective, then we should 
convert it to an internal 
error.(https://github.com/apache/spark/pull/38520/files#r1015171962)
2. The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
1. Update existed UT.
2. Pass GA.

Closes #38555 from panbingkun/convert_UNSPECIFIED_FRAME_to_INNER_ERROR.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   |  5 -
 .../spark/sql/catalyst/expressions/windowExpressions.scala |  8 +---
 .../catalyst/analysis/ExpressionTypeCheckingSuite.scala| 14 --
 3 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 63978e6be66..5d1fdbbdc05 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -349,11 +349,6 @@
   "cannot find a static method  that matches the argument 
types in "
 ]
   },
-  "UNSPECIFIED_FRAME" : {
-"message" : [
-  "Cannot use an UnspecifiedFrame. This should have been converted 
during analysis."
-]
-  },
   "UNSUPPORTED_INPUT_TYPE" : {
 "message" : [
   "The input of  can't be  type data."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 353ab22b5a5..c32bf4d4d45 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.util.Locale
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, 
UnresolvedException}
 import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, 
TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -57,8 +58,8 @@ case class WindowSpecDefinition(
   frameSpecification = newChildren.last.asInstanceOf[WindowFrame])
 
   override lazy val resolved: Boolean =
-childrenResolved && checkInputDataTypes().isSuccess &&
-  frameSpecification.isInstanceOf[SpecifiedWindowFrame]
+childrenResolved && frameSpecification.isInstanceOf[SpecifiedWindowFrame] 
&&
+  checkInputDataTypes().isSuccess
 
   override def nullable: Boolean = true
   override def dataType: DataType = throw 
QueryExecutionErrors.dataTypeOperationUnsupportedError
@@ -66,7 +67,8 @@ case class WindowSpecDefinition(
   override def checkInputDataTypes(): TypeCheckResult = {
 frameSpecification match {
   case UnspecifiedFrame =>
-DataTypeMismatch(errorSubClass = "UNSPECIFIED_FRAME")
+throw SparkException.internalError("Cannot use an UnspecifiedFrame. " +
+  "This should have been converted during analysis.")
   case f: SpecifiedWindowFrame if f.frameType == RangeFrame && 
!f.isUnbounded &&
   orderSpec.isEmpty =>
 DataTypeMismatch(errorSubClass = "RANGE_FRAME_WITHOUT_ORDER")
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
index 256cf439b65..d406ec8f74a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.AnalysisException
 import