[spark] branch master updated (7cbf7dd148d -> 24adac30539)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 7cbf7dd148d [SPARK-40372][SQL] Migrate failures of array type checks onto error classes add 24adac30539 [SPARK-41093][BUILD] Remove netty-tcnative-classes from Spark dependencyList No new revisions were added by this update. Summary of changes: common/network-common/pom.xml | 4 core/pom.xml | 4 dev/deps/spark-deps-hadoop-2-hive-2.3 | 1 - dev/deps/spark-deps-hadoop-3-hive-2.3 | 1 - pom.xml | 14 -- 5 files changed, 24 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4f614b3f699 -> 7cbf7dd148d)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4f614b3f699 [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob` add 7cbf7dd148d [SPARK-40372][SQL] Migrate failures of array type checks onto error classes No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 15 +- .../expressions/collectionOperations.scala | 203 +++--- .../resources/sql-tests/results/ansi/array.sql.out | 20 +- .../resources/sql-tests/results/ansi/map.sql.out | 4 +- .../test/resources/sql-tests/results/array.sql.out | 20 +- .../test/resources/sql-tests/results/map.sql.out | 4 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 408 +++-- 7 files changed, 486 insertions(+), 188 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4f614b3f699 [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob` 4f614b3f699 is described below commit 4f614b3f699d4d3924d4411c98a20d2e58b2e2e6 Author: Ruifeng Zheng AuthorDate: Fri Nov 11 21:18:09 2022 +0900 [SPARK-41005][CONNECT][FOLLOWUP] Collect should use `submitJob` instead of `runJob` ### What changes were proposed in this pull request? use `submitJob` instead of `runJob` ### Why are the changes needed? `spark.sparkContext.runJob` is blocked until finishes all partitions ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing Tests Closes #38614 from zhengruifeng/connect_collect_submitJob. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../sql/connect/service/SparkConnectStreamHandler.scala | 15 +++ 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index ffac330cd6d..55e091bd8d0 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -161,17 +161,24 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte () } -spark.sparkContext.runJob(batches, processPartition, resultHandler) +spark.sparkContext.submitJob( + rdd = batches, + processPartition = processPartition, + partitions = Seq.range(0, numPartitions), + resultHandler = resultHandler, + resultFunc = () => ()) // The man thread will wait until 0-th partition is available, -// then send it to client and wait for next partition. +// then send it to client and wait for the next partition. var currentPartitionId = 0 while (currentPartitionId < numPartitions) { val partition = signal.synchronized { -while (!partitions.contains(currentPartitionId)) { +var result = partitions.remove(currentPartitionId) +while (result.isEmpty) { signal.wait() + result = partitions.remove(currentPartitionId) } -partitions.remove(currentPartitionId).get +result.get } partition.foreach { case (bytes, count) => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 65530501bb2 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case 65530501bb2 is described below commit 65530501bb23a1e94f5138d9be0ef962bf33de76 Author: Mridul AuthorDate: Fri Nov 11 19:58:13 2022 +0900 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case ### What changes were proposed in this pull request? Fix flakey test failure ### Why are the changes needed? MT-safety issue in test ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Local tests. Need to validate on CI Closes #38617 from mridulm/fix-test-failure. Authored-by: Mridul Signed-off-by: Hyukjin Kwon --- .../apache/spark/scheduler/DAGSchedulerSuite.scala| 19 --- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f4e67eba40d..17abf3aef4e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.scheduler -import java.util.Properties +import java.util.{ArrayList => JArrayList, Collections => JCollections, Properties} import java.util.concurrent.{CountDownLatch, Delayed, ScheduledFuture, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference} import scala.annotation.meta.param +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal @@ -4533,16 +4534,20 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti blockStoreClientField.setAccessible(true) blockStoreClientField.set(sc.env.blockManager, blockStoreClient) - val sentHosts = ArrayBuffer[String]() + val sentHosts = JCollections.synchronizedList(new JArrayList[String]()) var hostAInterrupted = false doAnswer { (invoke: InvocationOnMock) => val host = invoke.getArgument[String](0) -sendRequestsLatch.countDown() try { if (host == "hostA") { -canSendRequestLatch.await(timeoutSecs * 2, TimeUnit.SECONDS) +sendRequestsLatch.countDown() +canSendRequestLatch.await(timeoutSecs * 5, TimeUnit.SECONDS) +// should not reach here, will get interrupted by DAGScheduler +sentHosts.add(host) + } else { +sentHosts.add(host) +sendRequestsLatch.countDown() } - sentHosts += host } catch { case _: InterruptedException => hostAInterrupted = true } finally { @@ -4559,8 +4564,8 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti sendRequestsLatch.await() verify(blockStoreClient, times(2)) .finalizeShuffleMerge(any(), any(), any(), any(), any()) - assert(sentHosts.nonEmpty) - assert(sentHosts.head === "hostB" && sentHosts.length == 1) + assert(1 == sentHosts.size()) + assert(sentHosts.asScala.toSeq === Seq("hostB")) completeLatch.await() assert(hostAInterrupted) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate ArrowConverters codes
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a45b0811ef2 [SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate ArrowConverters codes a45b0811ef2 is described below commit a45b0811ef22e2b66d52d066784e3fd2d9107f9d Author: Hyukjin Kwon AuthorDate: Fri Nov 11 19:12:09 2022 +0900 [SPARK-41108][SPARK-41005][CONNECT][FOLLOW-UP] Deduplicate ArrowConverters codes ### What changes were proposed in this pull request? This PR is a followup of both https://github.com/apache/spark/pull/38468 and https://github.com/apache/spark/pull/38612 that proposes to deduplicate codes in `ArrowConverters` by creating two classes `ArrowBatchIterator` and `ArrowBatchWithSchemaIterator`. In addition, we reuse `ArrowBatchWithSchemaIterator` when creating an empty Arrow batch at `createEmptyArrowBatch`. While I am here, - I addressed my own comment at https://github.com/apache/spark/pull/38612#discussion_r1019838326 - Kept the support of both max rows and size. Max row size check was removed in https://github.com/apache/spark/pull/38612 ### Why are the changes needed? For better readability and maintenance. ### Does this PR introduce _any_ user-facing change? No, both codes are not related. ### How was this patch tested? This is refactoring so existing CI should cover. Closes #38618 from HyukjinKwon/SPARK-41108-followup-1. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../service/SparkConnectStreamHandler.scala| 8 +- .../sql/execution/arrow/ArrowConverters.scala | 230 ++--- 2 files changed, 115 insertions(+), 123 deletions(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index 394d6477d73..ffac330cd6d 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -126,19 +126,23 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte def processAsArrowBatches(clientId: String, dataframe: DataFrame): Unit = { val spark = dataframe.sparkSession val schema = dataframe.schema +val maxRecordsPerBatch = spark.sessionState.conf.arrowMaxRecordsPerBatch val timeZoneId = spark.sessionState.conf.sessionLocalTimeZone SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) { val rows = dataframe.queryExecution.executedPlan.execute() val numPartitions = rows.getNumPartitions + // Conservatively sets it 70% because the size is not accurate but estimated. + val maxBatchSize = (MAX_BATCH_SIZE * 0.7).toLong var numSent = 0 if (numPartitions > 0) { type Batch = (Array[Byte], Long) val batches = rows.mapPartitionsInternal { iter => - ArrowConverters -.toBatchWithSchemaIterator(iter, schema, MAX_BATCH_SIZE, timeZoneId) + val newIter = ArrowConverters +.toBatchWithSchemaIterator(iter, schema, maxRecordsPerBatch, maxBatchSize, timeZoneId) + newIter.map { batch: Array[Byte] => (batch, newIter.rowCountInLastBatch) } } val signal = new Object diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index c233ac32c12..a60f7b5970d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -71,63 +71,125 @@ private[sql] class ArrowBatchStreamWriter( } private[sql] object ArrowConverters extends Logging { - - /** - * Maps Iterator from InternalRow to serialized ArrowRecordBatches. Limit ArrowRecordBatch size - * in a batch by setting maxRecordsPerBatch or use 0 to fully consume rowIter. - */ - private[sql] def toBatchIterator( + private[sql] class ArrowBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, - maxRecordsPerBatch: Int, + maxRecordsPerBatch: Long, timeZoneId: String, - context: TaskContext): Iterator[Array[Byte]] = { + context: TaskContext) extends Iterator[Array[Byte]] { -val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId) -val allocator = - ArrowUtils.rootAllocator.newChildAllocator("toBatchIterator", 0, Long.MaxValue) +protected val arrowSchema =
[spark] branch master updated (c80de3b67ae -> ffb6a4628c1)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c80de3b67ae [SPARK-41044][SQL] Convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR add ffb6a4628c1 [SPARK-41110][CONNECT][PYTHON] Implement `DataFrame.sparkSession` in Python client No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/dataframe.py| 11 +++ python/pyspark/sql/tests/connect/test_connect_basic.py | 3 +++ 2 files changed, 14 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41044][SQL] Convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c80de3b67ae [SPARK-41044][SQL] Convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR c80de3b67ae is described below commit c80de3b67ae05d8c17d9afef9655ad2e76bfd05f Author: panbingkun AuthorDate: Fri Nov 11 12:27:28 2022 +0300 [SPARK-41044][SQL] Convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR ### What changes were proposed in this pull request? The pr aims to convert DATATYPE_MISMATCH.UNSPECIFIED_FRAME to INTERNAL_ERROR. ### Why are the changes needed? 1. When I work on https://issues.apache.org/jira/browse/SPARK-41021, I can't found the path to trigger it from the user's perspective, then we should convert it to an internal error.(https://github.com/apache/spark/pull/38520/files#r1015171962) 2. The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1. Update existed UT. 2. Pass GA. Closes #38555 from panbingkun/convert_UNSPECIFIED_FRAME_to_INNER_ERROR. Authored-by: panbingkun Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 - .../spark/sql/catalyst/expressions/windowExpressions.scala | 8 +--- .../catalyst/analysis/ExpressionTypeCheckingSuite.scala| 14 -- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 63978e6be66..5d1fdbbdc05 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -349,11 +349,6 @@ "cannot find a static method that matches the argument types in " ] }, - "UNSPECIFIED_FRAME" : { -"message" : [ - "Cannot use an UnspecifiedFrame. This should have been converted during analysis." -] - }, "UNSUPPORTED_INPUT_TYPE" : { "message" : [ "The input of can't be type data." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 353ab22b5a5..c32bf4d4d45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Locale +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -57,8 +58,8 @@ case class WindowSpecDefinition( frameSpecification = newChildren.last.asInstanceOf[WindowFrame]) override lazy val resolved: Boolean = -childrenResolved && checkInputDataTypes().isSuccess && - frameSpecification.isInstanceOf[SpecifiedWindowFrame] +childrenResolved && frameSpecification.isInstanceOf[SpecifiedWindowFrame] && + checkInputDataTypes().isSuccess override def nullable: Boolean = true override def dataType: DataType = throw QueryExecutionErrors.dataTypeOperationUnsupportedError @@ -66,7 +67,8 @@ case class WindowSpecDefinition( override def checkInputDataTypes(): TypeCheckResult = { frameSpecification match { case UnspecifiedFrame => -DataTypeMismatch(errorSubClass = "UNSPECIFIED_FRAME") +throw SparkException.internalError("Cannot use an UnspecifiedFrame. " + + "This should have been converted during analysis.") case f: SpecifiedWindowFrame if f.frameType == RangeFrame && !f.isUnbounded && orderSpec.isEmpty => DataTypeMismatch(errorSubClass = "RANGE_FRAME_WITHOUT_ORDER") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala index 256cf439b65..d406ec8f74a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.AnalysisException import