[GitHub] [spark] viirya commented on a change in pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
viirya commented on a change in pull request #25965: URL: https://github.com/apache/spark/pull/25965#discussion_r489180882 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ## @@ -585,7 +585,8 @@ class MicroBatchExecution( withProgressLocked { sinkCommitProgress = batchSinkProgress watermarkTracker.updateWatermark(lastExecution.executedPlan) - commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), +s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") Review comment: update to the log -> update to the commit log? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29762: [SPARK-32892][CORE][SQL] Fix hash functions on big-endian platforms.
SparkQA commented on pull request #29762: URL: https://github.com/apache/spark/pull/29762#issuecomment-693193153 **[Test build #128738 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128738/testReport)** for PR 29762 at commit [`df1a1c8`](https://github.com/apache/spark/commit/df1a1c8d826ec4d5cf64d8c04eb03e6dd5bb9d9d). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #29734: [SPARK-32861][SQL] GenerateExec should require column ordering
cloud-fan commented on pull request #29734: URL: https://github.com/apache/spark/pull/29734#issuecomment-693193357 thanks, merging to master! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29762: [SPARK-32892][CORE][SQL] Fix hash functions on big-endian platforms.
SparkQA removed a comment on pull request #29762: URL: https://github.com/apache/spark/pull/29762#issuecomment-693146552 **[Test build #128738 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128738/testReport)** for PR 29762 at commit [`df1a1c8`](https://github.com/apache/spark/commit/df1a1c8d826ec4d5cf64d8c04eb03e6dd5bb9d9d). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29722: [SPARK-32850][CORE] Simplify the RPC message flow of decommission
cloud-fan commented on a change in pull request #29722: URL: https://github.com/apache/spark/pull/29722#discussion_r489185564 ## File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ## @@ -245,15 +245,24 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) -case WorkerDecommission(id, workerRef) => - logInfo("Recording worker %s decommissioning".format(id)) +case WorkerDecommissioning(id, workerRef) => if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { // We use foreach since get gives us an option and we can skip the failures. -idToWorker.get(id).foreach(decommissionWorker) +idToWorker.get(id).foreach(w => decommissionWorker(w)) } +case DecommissionWorkers(ids) => + ids.foreach ( id => Review comment: shall we skip it for `if (state == RecoveryState.STANDBY)` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29766: [SPARK-32704][SQL][TESTS][FOLLOW-UP] Check any physical rule instead of a specific rule in the test
AmplabJenkins removed a comment on pull request #29766: URL: https://github.com/apache/spark/pull/29766#issuecomment-693194803 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29722: [SPARK-32850][CORE] Simplify the RPC message flow of decommission
cloud-fan commented on a change in pull request #29722: URL: https://github.com/apache/spark/pull/29722#discussion_r489185786 ## File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ## @@ -245,15 +245,24 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) -case WorkerDecommission(id, workerRef) => - logInfo("Recording worker %s decommissioning".format(id)) +case WorkerDecommissioning(id, workerRef) => if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { // We use foreach since get gives us an option and we can skip the failures. -idToWorker.get(id).foreach(decommissionWorker) +idToWorker.get(id).foreach(w => decommissionWorker(w)) } +case DecommissionWorkers(ids) => + ids.foreach ( id => Review comment: or it will never happen because the request comes from master web UI? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
SparkQA commented on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693195603 **[Test build #128729 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128729/testReport)** for PR 29765 at commit [`89c2eda`](https://github.com/apache/spark/commit/89c2eda9787b3d904f536dd7c064a1b45324ed0a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
SparkQA commented on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693197739 **[Test build #128745 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128745/testReport)** for PR 25965 at commit [`a885212`](https://github.com/apache/spark/commit/a8852123ae978dddb5578e3411b4822fd4a1fdce). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29756: [SPARK-32885][SS] Add DataStreamReader.table API
viirya commented on a change in pull request #29756: URL: https://github.com/apache/spark/pull/29756#discussion_r489192414 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -846,9 +848,9 @@ class Analyzer( */ object ResolveTempViews extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case u @ UnresolvedRelation(ident, _) => + case u @ UnresolvedRelation(ident, _, _) => lookupTempView(ident).getOrElse(u) - case i @ InsertIntoStatement(UnresolvedRelation(ident, _), _, _, _, _) => + case i @ InsertIntoStatement(UnresolvedRelation(ident, _, _), _, _, _, _) => Review comment: We don't need to only match `isStreaming` = false here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
AmplabJenkins commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693200669 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
AmplabJenkins removed a comment on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693200669 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29739: [SPARK-32867][SQL] When explain, HiveTableRelation show limited message
cloud-fan commented on a change in pull request #29739: URL: https://github.com/apache/spark/pull/29739#discussion_r489199893 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ## @@ -693,4 +694,29 @@ case class HiveTableRelation( override def newInstance(): HiveTableRelation = copy( dataCols = dataCols.map(_.newInstance()), partitionCols = partitionCols.map(_.newInstance())) + + override def simpleString(maxFields: Int): String = { +val catalogTable = tableMeta.storage.serde match { + case Some(serde) => tableMeta.identifier :: serde :: Nil + case _ => tableMeta.identifier :: Nil +} + +val metadataEntries = Map( + "CatalogTable" -> catalogTable.mkString(", "), Review comment: is it also how file source tables display table info? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
gaborgsomogyi commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r489212011 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -105,34 +109,16 @@ private[kafka010] class KafkaOffsetReader( minPartitions.map(_ > numTopicPartitions).getOrElse(false) } - private def nextGroupId(): String = { -groupId = driverGroupIdPrefix + "-" + nextId -nextId += 1 -groupId - } - override def toString(): String = consumerStrategy.toString /** * Closes the connection to Kafka, and cleans up state. */ def close(): Unit = { -if (_consumer != null) uninterruptibleThreadRunner.runUninterruptibly { stopConsumer() } +if (_admin != null) uninterruptibleThreadRunner.runUninterruptibly { stopAdmin() } uninterruptibleThreadRunner.shutdown() } - /** - * @return The Set of TopicPartitions for a given topic - */ - def fetchTopicPartitions(): Set[TopicPartition] = uninterruptibleThreadRunner.runUninterruptibly { -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) Review comment: I've just finished the testing of `UninterruptibleThread` removal from `KafkaOffsetReader`. We can put it here if we agree. Both side we can come up w/ pros and cons. > get rid of using UninterruptibleThread altogether. Not sure it's possible or at least not super easy. In executor side it's used because of multiple reasons. Please see: https://github.com/apache/spark/blob/2e3aa2f0232a539346da3df8a20cd8e7c2b7dd4f/core/src/main/scala/org/apache/spark/executor/Executor.scala#L101-L104 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29316: [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
AmplabJenkins removed a comment on pull request #29316: URL: https://github.com/apache/spark/pull/29316#issuecomment-693218342 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/128735/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29591: [SPARK-32714][PYTHON] Initial pyspark-stubs port.
AmplabJenkins removed a comment on pull request #29591: URL: https://github.com/apache/spark/pull/29591#issuecomment-693218513 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/128740/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
HeartSaVioR commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r489217921 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -105,34 +109,16 @@ private[kafka010] class KafkaOffsetReader( minPartitions.map(_ > numTopicPartitions).getOrElse(false) } - private def nextGroupId(): String = { -groupId = driverGroupIdPrefix + "-" + nextId -nextId += 1 -groupId - } - override def toString(): String = consumerStrategy.toString /** * Closes the connection to Kafka, and cleans up state. */ def close(): Unit = { -if (_consumer != null) uninterruptibleThreadRunner.runUninterruptibly { stopConsumer() } +if (_admin != null) uninterruptibleThreadRunner.runUninterruptibly { stopAdmin() } uninterruptibleThreadRunner.shutdown() } - /** - * @return The Set of TopicPartitions for a given topic - */ - def fetchTopicPartitions(): Set[TopicPartition] = uninterruptibleThreadRunner.runUninterruptibly { -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) Review comment: Having a separate PR is OK for me - that would avoid unnecessary hold on this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
HeartSaVioR commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693225135 retest this, please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #29767: [SPARK-32896][SS] Add DataStreamWriter.table API
HeartSaVioR commented on pull request #29767: URL: https://github.com/apache/spark/pull/29767#issuecomment-693225169 retest this, please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
AmplabJenkins removed a comment on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693227460 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
AmplabJenkins commented on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693227427 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
AmplabJenkins commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693227460 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29767: [SPARK-32896][SS] Add DataStreamWriter.table API
AmplabJenkins commented on pull request #29767: URL: https://github.com/apache/spark/pull/29767#issuecomment-693227405 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
AmplabJenkins removed a comment on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693227427 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
gaborgsomogyi commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r489221735 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -213,64 +187,68 @@ private[kafka010] class KafkaOffsetReader( assert(partitions.asScala == partitionTimestamps.keySet, "If starting/endingOffsetsByTimestamp contains specific offsets, you must specify all " + s"topics. Specified: ${partitionTimestamps.keySet} Assigned: ${partitions.asScala}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionTimestamps") + logDebug(s"Assigned partitions: $partitions. Seeking to $partitionTimestamps") } val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => { -val converted = partitionTimestamps.map { case (tp, timestamp) => - tp -> java.lang.Long.valueOf(timestamp) +val listOffsetsParams = partitionTimestamps.map { case (tp, timestamp) => + tp -> OffsetSpec.forTimestamp(timestamp) }.asJava +admin.listOffsets(listOffsetsParams, listOffsetsOptions).all().get().asScala.map { + case (tp, offsetSpec) => +if (failsOnNoMatchingOffset) { + assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, "No offset " + +s"matched from request of topic-partition $tp and timestamp " + +s"${partitionTimestamps(tp)}.") +} -val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] = - consumer.offsetsForTimes(converted) - -offsetForTime.asScala.map { case (tp, offsetAndTimestamp) => - if (failsOnNoMatchingOffset) { -assert(offsetAndTimestamp != null, "No offset matched from request of " + - s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.") - } - - if (offsetAndTimestamp == null) { -tp -> KafkaOffsetRangeLimit.LATEST - } else { -tp -> offsetAndTimestamp.offset() - } +if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) { + tp -> KafkaOffsetRangeLimit.LATEST +} else { + tp -> offsetSpec.offset() +} }.toMap } } -val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { _ => } - -fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets, - fnAssertFetchedOffsets) +fetchSpecificOffsets0(fnAssertParametersWithPartitions, fnRetrievePartitionOffsets) } private def fetchSpecificOffsets0( fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit, - fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long], - fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit): KafkaSourceOffset = { + fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] Review comment: It mustn't be unless the code you've pinpointed can fire up somehow: https://github.com/apache/spark/pull/29729#discussion_r488831782 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29767: [SPARK-32896][SS] Add DataStreamWriter.table API
AmplabJenkins removed a comment on pull request #29767: URL: https://github.com/apache/spark/pull/29767#issuecomment-693227405 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
cloud-fan commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489227880 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -476,7 +476,7 @@ case class Lag(input: Expression, offset: Expression, default: Expression) abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction { self: Product => - override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) + override def frame: WindowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) Review comment: is it a necessary change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29585: [SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans
SparkQA commented on pull request #29585: URL: https://github.com/apache/spark/pull/29585#issuecomment-693233512 **[Test build #128754 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128754/testReport)** for PR 29585 at commit [`fb048c7`](https://github.com/apache/spark/commit/fb048c7afcc6e366f8e840a3e0a02849a129a04c). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29585: [SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans
AmplabJenkins commented on pull request #29585: URL: https://github.com/apache/spark/pull/29585#issuecomment-693234148 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
cloud-fan commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489231123 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,85 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If ignoreNulls=true, we will skip + nulls when finding the `offset`th row. Otherwise, every row counts for the `offset`. If + there is no such an offset row (e.g., when the offset is 10, size of the window frame less + than 10), null is returned. + """, + arguments = """ +Arguments: + * input - the target column or expression that the function operates on. + * offset - a positive int literal to indicate the offset in the window frame. It starts with 1. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) +extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!offsetExpr.foldable) { + TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.") +} else { + offsetExpr.dataType match { Review comment: This will always be `IntegerType` when we reach here, because it extends `ImplicitCastInputTypes`. We can just do `offsetExpr.eval().asInstanceOf[Int] match ...` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
cloud-fan commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489234526 ## File path: sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out ## @@ -385,6 +385,15 @@ org.apache.spark.sql.AnalysisException cannot resolve 'ntile(0)' due to data type mismatch: Buckets expression must be positive, but got: 0; line 1 pos 7 +-- !query +SELECT nth_value(four, 0) OVER (ORDER BY ten), ten, four FROM tenk1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +cannot resolve 'nthvalue(spark_catalog.default.tenk1.`four`)' due to data type mismatch: The 'offset' argument of nth_value must be greater than zero but it is 0.; line 1 pos 7 Review comment: does pgsql also fail for this query? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
cloud-fan commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r48927 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala ## @@ -233,6 +233,17 @@ class AnalysisErrorSuite extends AnalysisTest { SpecifiedWindowFrame(RangeFrame, Literal(1), Literal(2.as("window")), "window frame" :: "must match the required frame" :: Nil) + errorTest( +"nth_value window function", +testRelation2.select( + WindowExpression( +new NthValue(AttributeReference("b", IntegerType)(), Literal(0)), +WindowSpecDefinition( + UnresolvedAttribute("a") :: Nil, + SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil, + SpecifiedWindowFrame(RowFrame, Literal(0), Literal(0.as("window")), +"The 'offset' argument of nth_value must be greater than zero but it is 0." :: Nil) Review comment: can we test one more case that the offset parameter is not int? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29770: [SPARK-32899][CORE] Support submit application with user-defined cluster manager
AmplabJenkins commented on pull request #29770: URL: https://github.com/apache/spark/pull/29770#issuecomment-693240551 Can one of the admins verify this patch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29770: [SPARK-32899][CORE] Support submit application with user-defined cluster manager
AmplabJenkins removed a comment on pull request #29770: URL: https://github.com/apache/spark/pull/29770#issuecomment-693239885 Can one of the admins verify this patch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489243512 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -476,7 +476,7 @@ case class Lag(input: Expression, offset: Expression, default: Expression) abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction { self: Product => - override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) + override def frame: WindowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow) Review comment: OK. Let's revert it. `NthValue` rewrite it as `override val frame: WindowFrame = UnspecifiedFrame`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489245255 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,85 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If ignoreNulls=true, we will skip + nulls when finding the `offset`th row. Otherwise, every row counts for the `offset`. If + there is no such an offset row (e.g., when the offset is 10, size of the window frame less + than 10), null is returned. + """, + arguments = """ +Arguments: + * input - the target column or expression that the function operates on. + * offset - a positive int literal to indicate the offset in the window frame. It starts with 1. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) +extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29316: [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
SparkQA commented on pull request #29316: URL: https://github.com/apache/spark/pull/29316#issuecomment-693247282 **[Test build #128755 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128755/testReport)** for PR 29316 at commit [`76acc07`](https://github.com/apache/spark/commit/76acc0751b08fe2112b3538de0e25b57e67dc050). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489252543 ## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala ## @@ -233,6 +233,17 @@ class AnalysisErrorSuite extends AnalysisTest { SpecifiedWindowFrame(RangeFrame, Literal(1), Literal(2.as("window")), "window frame" :: "must match the required frame" :: Nil) + errorTest( +"nth_value window function", +testRelation2.select( + WindowExpression( +new NthValue(AttributeReference("b", IntegerType)(), Literal(0)), +WindowSpecDefinition( + UnresolvedAttribute("a") :: Nil, + SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil, + SpecifiedWindowFrame(RowFrame, Literal(0), Literal(0.as("window")), +"The 'offset' argument of nth_value must be greater than zero but it is 0." :: Nil) Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489256112 ## File path: sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out ## @@ -385,6 +385,15 @@ org.apache.spark.sql.AnalysisException cannot resolve 'ntile(0)' due to data type mismatch: Buckets expression must be positive, but got: 0; line 1 pos 7 +-- !query +SELECT nth_value(four, 0) OVER (ORDER BY ten), ten, four FROM tenk1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +cannot resolve 'nthvalue(spark_catalog.default.tenk1.`four`)' due to data type mismatch: The 'offset' argument of nth_value must be greater than zero but it is 0.; line 1 pos 7 Review comment: Yes. ``` SELECT nth_value(four, 0) OVER (ORDER BY ten), ten, four FROM tenk1; ERROR: argument of nth_value must be greater than zero ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
AmplabJenkins commented on pull request #29604: URL: https://github.com/apache/spark/pull/29604#issuecomment-693258818 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
SparkQA commented on pull request #29604: URL: https://github.com/apache/spark/pull/29604#issuecomment-693258202 **[Test build #128756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128756/testReport)** for PR 29604 at commit [`db2b1d4`](https://github.com/apache/spark/commit/db2b1d4bc9272a31af47365f4526b14f54815b3d). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mundaym commented on pull request #29762: [SPARK-32892][CORE][SQL] Fix hash functions on big-endian platforms.
mundaym commented on pull request #29762: URL: https://github.com/apache/spark/pull/29762#issuecomment-693262630 In an ideal world we'd probably consolidate platform endianness handling into the Platform class by adding methods that perform the necessary conversions. I'm not sure if that would be a reasonable to add to the Platform class API? We could, for example, have overloads for getters and setters for at least the primitive types that take a byte order argument so we could make calls like `Platform.getInt(base, offset, ByteOrder.LITTLE_ENDIAN)`. The implementation would be trivial: ```java public static int getInt(Object object, long offset, ByteOrder order) { int x = _UNSAFE.getInt(object, offset); return ByteOrder.nativeOrder().equals(order) ? x : Integer.reverseBytes(x); } ``` That would make changes like this one far cleaner. Though it might be slightly more work (and therefore be slightly riskier performance-wise) for the JIT compiler to optimize well than the proposed inline conversions in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
maropu commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489265261 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala ## @@ -507,33 +507,156 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { } test("SPARK-29358: Make unionByName optionally fill missing columns with nulls") { -var df1 = Seq(1, 2, 3).toDF("a") -var df2 = Seq(3, 1, 2).toDF("b") -val df3 = Seq(2, 3, 1).toDF("c") -val unionDf = df1.unionByName(df2.unionByName(df3, true), true) -checkAnswer(unionDf, - Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1 -Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2 -Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3 -) +Seq("true", "false").foreach { config => + withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> config) { +var df1 = Seq(1, 2, 3).toDF("a") +var df2 = Seq(3, 1, 2).toDF("b") +val df3 = Seq(2, 3, 1).toDF("c") +val unionDf = df1.unionByName(df2.unionByName(df3, true), true) +checkAnswer(unionDf, + Row(1, null, null) :: Row(2, null, null) :: Row(3, null, null) :: // df1 +Row(null, 3, null) :: Row(null, 1, null) :: Row(null, 2, null) :: // df2 +Row(null, null, 2) :: Row(null, null, 3) :: Row(null, null, 1) :: Nil // df3 +) + +df1 = Seq((1, 2)).toDF("a", "c") +df2 = Seq((3, 4, 5)).toDF("a", "b", "c") +checkAnswer(df1.unionByName(df2, true), + Row(1, 2, null) :: Row(3, 5, 4) :: Nil) +checkAnswer(df2.unionByName(df1, true), + Row(3, 4, 5) :: Row(1, null, 2) :: Nil) + +withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + df2 = Seq((3, 4, 5)).toDF("a", "B", "C") + val union1 = df1.unionByName(df2, true) + val union2 = df2.unionByName(df1, true) + + checkAnswer(union1, Row(1, 2, null, null) :: Row(3, null, 4, 5) :: Nil) + checkAnswer(union2, Row(3, 4, 5, null) :: Row(1, null, null, 2) :: Nil) + + assert(union1.schema.fieldNames === Array("a", "c", "B", "C")) + assert(union2.schema.fieldNames === Array("a", "B", "C", "c")) +} + } +} + } -df1 = Seq((1, 2)).toDF("a", "c") -df2 = Seq((3, 4, 5)).toDF("a", "b", "c") -checkAnswer(df1.unionByName(df2, true), - Row(1, 2, null) :: Row(3, 5, 4) :: Nil) -checkAnswer(df2.unionByName(df1, true), - Row(3, 4, 5) :: Row(1, null, 2) :: Nil) + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - simple") { +withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") { + val df1 = Seq(((1, 2, 3), 0), ((2, 3, 4), 1), ((3, 4, 5), 2)).toDF("a", "idx") + val df2 = Seq(((3, 4), 0), ((1, 2), 1), ((2, 3), 2)).toDF("a", "idx") + val df3 = Seq(((100, 101, 102, 103), 0), ((110, 111, 112, 113), 1), ((120, 121, 122, 123), 2)) +.toDF("a", "idx") + + var unionDf = df1.unionByName(df2, true) + + checkAnswer(unionDf, +Row(Row(1, 2, 3), 0) :: Row(Row(2, 3, 4), 1) :: Row(Row(3, 4, 5), 2) :: + Row(Row(3, 4, null), 0) :: Row(Row(1, 2, null), 1) :: Row(Row(2, 3, null), 2) :: Nil + ) + + assert(unionDf.schema.toDDL == "`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT>,`idx` INT") + + unionDf = df1.unionByName(df2, true).unionByName(df3, true) + + checkAnswer(unionDf, +Row(Row(1, 2, 3, null), 0) :: + Row(Row(2, 3, 4, null), 1) :: + Row(Row(3, 4, 5, null), 2) :: // df1 + Row(Row(3, 4, null, null), 0) :: + Row(Row(1, 2, null, null), 1) :: + Row(Row(2, 3, null, null), 2) :: // df2 + Row(Row(100, 101, 102, 103), 0) :: + Row(Row(110, 111, 112, 113), 1) :: + Row(Row(120, 121, 122, 123), 2) :: Nil // df3 + ) + assert(unionDf.schema.toDDL == +"`a` STRUCT<`_1`: INT, `_2`: INT, `_3`: INT, `_4`: INT>,`idx` INT") +} + } -withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - df2 = Seq((3, 4, 5)).toDF("a", "B", "C") - val union1 = df1.unionByName(df2, true) - val union2 = df2.unionByName(df1, true) + test("SPARK-32376: Make unionByName null-filling behavior work with struct columns - nested") { +withSQLConf(SQLConf.UNION_BYNAME_STRUCT_SUPPORT_ENABLED.key -> "true") { + val df1 = Seq((0, UnionClass1a(0, 1L, UnionClass2(1, "2".toDF("id", "a") + val df2 = Seq((1, UnionClass1b(1, 2L, UnionClass3(2, 3L.toDF("id", "a") + + val expectedSchema = "`id` INT,`a` STRUCT<`a`: INT, `b`: BIGINT, " + +"`nested`: STRUCT<`a`: INT, `b`: BIGINT, `c`: STRING>>" + + var unionDf = df1.unionByName(df2, true) + checkAnswer(unionDf, +Row(0, Row(0, 1, Row(1, null, "2"))) :: + Row(1, Row(1, 2,
[GitHub] [spark] maropu commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
maropu commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489259568 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) Review comment: We need this `assert` here? It seems this check's already been done in https://github.com/apache/spark/pull/29587/files#diff-4d656d696512d6bcb03a48f7e0af6251R90 ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
HeartSaVioR commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r489215803 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -105,34 +109,16 @@ private[kafka010] class KafkaOffsetReader( minPartitions.map(_ > numTopicPartitions).getOrElse(false) } - private def nextGroupId(): String = { -groupId = driverGroupIdPrefix + "-" + nextId -nextId += 1 -groupId - } - override def toString(): String = consumerStrategy.toString /** * Closes the connection to Kafka, and cleans up state. */ def close(): Unit = { -if (_consumer != null) uninterruptibleThreadRunner.runUninterruptibly { stopConsumer() } +if (_admin != null) uninterruptibleThreadRunner.runUninterruptibly { stopAdmin() } uninterruptibleThreadRunner.shutdown() } - /** - * @return The Set of TopicPartitions for a given topic - */ - def fetchTopicPartitions(): Set[TopicPartition] = uninterruptibleThreadRunner.runUninterruptibly { -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) Review comment: Sorry I meant Kafka related, driver side only. (In executors we still leverage KafkaConsumer so not 100% sure we can get rid of it. Though still better to get rid if possible.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
gaborgsomogyi commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r489218487 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -105,34 +109,16 @@ private[kafka010] class KafkaOffsetReader( minPartitions.map(_ > numTopicPartitions).getOrElse(false) } - private def nextGroupId(): String = { -groupId = driverGroupIdPrefix + "-" + nextId -nextId += 1 -groupId - } - override def toString(): String = consumerStrategy.toString /** * Closes the connection to Kafka, and cleans up state. */ def close(): Unit = { -if (_consumer != null) uninterruptibleThreadRunner.runUninterruptibly { stopConsumer() } +if (_admin != null) uninterruptibleThreadRunner.runUninterruptibly { stopAdmin() } uninterruptibleThreadRunner.shutdown() } - /** - * @return The Set of TopicPartitions for a given topic - */ - def fetchTopicPartitions(): Set[TopicPartition] = uninterruptibleThreadRunner.runUninterruptibly { -assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) Review comment: I agree w/ this way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
HeartSaVioR commented on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693225068 retest this, please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
SparkQA commented on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693226793 **[Test build #128752 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128752/testReport)** for PR 28841 at commit [`1c8384c`](https://github.com/apache/spark/commit/1c8384cebe05fbc1ebb3c86660e57508b031c6f9). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29767: [SPARK-32896][SS] Add DataStreamWriter.table API
SparkQA commented on pull request #29767: URL: https://github.com/apache/spark/pull/29767#issuecomment-693226706 **[Test build #128751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128751/testReport)** for PR 29767 at commit [`f557696`](https://github.com/apache/spark/commit/f5576969ee9afeaa35c3bb74822aad43cf9c11c2). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
SparkQA commented on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693226799 **[Test build #128753 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128753/testReport)** for PR 25965 at commit [`a885212`](https://github.com/apache/spark/commit/a8852123ae978dddb5578e3411b4822fd4a1fdce). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
gaborgsomogyi commented on a change in pull request #29729: URL: https://github.com/apache/spark/pull/29729#discussion_r489222628 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -182,28 +170,14 @@ private[kafka010] class KafkaOffsetReader( "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + "Use -1 for latest, -2 for earliest, if you don't care.\n" + s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}") - logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") + logDebug(s"Assigned partitions: $partitions. Seeking to $partitionOffsets") } val fnRetrievePartitionOffsets: ju.Set[TopicPartition] => Map[TopicPartition, Long] = { _ => partitionOffsets } -val fnAssertFetchedOffsets: Map[TopicPartition, Long] => Unit = { fetched => - partitionOffsets.foreach { -case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && - off != KafkaOffsetRangeLimit.EARLIEST => - if (fetched(tp) != off) { -reportDataLoss( Review comment: As mentioned in the doc already but putting it here as well this condition was never covered w/ testcases. Maybe never worked?! Not sure... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on pull request #29585: [SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans
maropu commented on pull request #29585: URL: https://github.com/apache/spark/pull/29585#issuecomment-693232942 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
cloud-fan commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489229928 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,85 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If ignoreNulls=true, we will skip + nulls when finding the `offset`th row. Otherwise, every row counts for the `offset`. If + there is no such an offset row (e.g., when the offset is 10, size of the window frame less + than 10), null is returned. + """, + arguments = """ +Arguments: + * input - the target column or expression that the function operates on. + * offset - a positive int literal to indicate the offset in the window frame. It starts with 1. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) +extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) Review comment: `ignoreNulls` is not a child (it's Boolean), so this should be `Seq(AnyDataType, IntegerType)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29585: [SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans
AmplabJenkins removed a comment on pull request #29585: URL: https://github.com/apache/spark/pull/29585#issuecomment-693234148 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ConeyLiu opened a new pull request #29770: [SPARK-32899][CORE] Support submit application with user-defined cluster manager
ConeyLiu opened a new pull request #29770: URL: https://github.com/apache/spark/pull/29770 ### What changes were proposed in this pull request? Add the support to submit applications with user-defined cluster manager. ### Why are the changes needed? We have supported users to define the customed cluster manager with `ExternalClusterManager` trait. However, we can not submit the application with `SparkSubmit`. And also we can set the user-defined master with pyspark. The reason is that we check the master whether is the natively support one in `SparkSubmit`. However, the customed cluster manager is checked in `SparkContext`. This patch fixes the problem. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29770: [SPARK-32899][CORE] Support submit application with user-defined cluster manager
AmplabJenkins commented on pull request #29770: URL: https://github.com/apache/spark/pull/29770#issuecomment-693239885 Can one of the admins verify this patch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489247134 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,85 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If ignoreNulls=true, we will skip + nulls when finding the `offset`th row. Otherwise, every row counts for the `offset`. If + there is no such an offset row (e.g., when the offset is 10, size of the window frame less + than 10), null is returned. + """, + arguments = """ +Arguments: + * input - the target column or expression that the function operates on. + * offset - a positive int literal to indicate the offset in the window frame. It starts with 1. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) +extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!offsetExpr.foldable) { + TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.") +} else { + offsetExpr.dataType match { Review comment: Yeah! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29316: [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
AmplabJenkins removed a comment on pull request #29316: URL: https://github.com/apache/spark/pull/29316#issuecomment-693247910 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29316: [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
AmplabJenkins commented on pull request #29316: URL: https://github.com/apache/spark/pull/29316#issuecomment-693247910 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
AmplabJenkins removed a comment on pull request #29604: URL: https://github.com/apache/spark/pull/29604#issuecomment-693258818 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] maropu commented on a change in pull request #29587: [SPARK-32376][SQL] Make unionByName null-filling behavior work with struct columns
maropu commented on a change in pull request #29587: URL: https://github.com/apache/spark/pull/29587#discussion_r489257352 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} Review comment: nit: Why don't you fold it here? `.expressions._`? ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) + +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => (name, GetStructField(KnownNotNull(expr), i).asInstanceOf[Expression]) Review comment: We need the cast `.asInstanceOf[Expression]`? ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveUnion.scala ## @@ -17,29 +17,168 @@ package org.apache.spark.sql.catalyst.analysis +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, KnownNotNull, Literal, NamedExpression, WithFields} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils +import org.apache.spark.unsafe.types.UTF8String /** * Resolves different children of Union to a common set of columns. */ object ResolveUnion extends Rule[LogicalPlan] { - private def unionTwoSides( + /** + * This method sorts columns in a struct expression based on column names. + */ + private def sortStructFields(expr: Expression): Expression = { +assert(expr.dataType.isInstanceOf[StructType]) + +val existingExprs = expr.dataType.asInstanceOf[StructType].fieldNames.zipWithIndex.map { + case (name, i) => (name, GetStructField(KnownNotNull(expr), i).asInstanceOf[Expression]) +}.sortBy(_._1).flatMap(pair => Seq(Literal(pair._1), pair._2)) + +val newExpr = CreateNamedStruct(existingExprs) +if (expr.nullable) { + If(IsNull(expr), Literal(null, newExpr.dataType), newExpr) +} else { + newExpr +} + } + + private def sortStructFields(fieldExprs: Seq[Expression]): Seq[Expression] = { +fieldExprs.grouped(2).map { e => + Seq(e.head, e.last) +}.toSeq.sortBy { pair => + assert(pair(0).isInstanceOf[Literal]) + pair(0).eval().asInstanceOf[UTF8String].toString +}.flatten + } + + /** + * This helper method sorts fields in a `WithFields` expression by field name. + */ + private def sortStructFieldsInWithFields(expr: Expression): Expression = expr transformUp { +case w: WithFields if w.resolved => + w.evalExpr match { +case i @ If(IsNull(_), _, CreateNamedStruct(fieldExprs)) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + i.copy(trueValue = Literal(null, newStruct.dataType), falseValue = newStruct) +case CreateNamedStruct(fieldExprs) => + val sorted = sortStructFields(fieldExprs) + val newStruct = CreateNamedStruct(sorted) + newStruct +case other =>
[GitHub] [spark] AngersZhuuuu commented on pull request #29739: [SPARK-32867][SQL] When explain, HiveTableRelation show limited message
AngersZh commented on pull request #29739: URL: https://github.com/apache/spark/pull/29739#issuecomment-693264163 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29766: [SPARK-32704][SQL][TESTS][FOLLOW-UP] Check any physical rule instead of a specific rule in the test
AmplabJenkins commented on pull request #29766: URL: https://github.com/apache/spark/pull/29766#issuecomment-693194803 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29762: [SPARK-32892][CORE][SQL] Fix hash functions on big-endian platforms.
AmplabJenkins removed a comment on pull request #29762: URL: https://github.com/apache/spark/pull/29762#issuecomment-693193773 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/128738/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
viirya commented on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693199014 Not sure why GitHub Actions failed at Python linter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API
HeartSaVioR commented on pull request #29729: URL: https://github.com/apache/spark/pull/29729#issuecomment-693198878 Looks OK except the cleanup of unnecessary workarounds for `consumer.poll` as a new commit. cc. @zsxwing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
AmplabJenkins removed a comment on pull request #29604: URL: https://github.com/apache/spark/pull/29604#issuecomment-693214552 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29731: [SPARK-32706][SQL] Improve cast string to decimal type
SparkQA commented on pull request #29731: URL: https://github.com/apache/spark/pull/29731#issuecomment-693220302 **[Test build #128750 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128750/testReport)** for PR 29731 at commit [`abf2f7e`](https://github.com/apache/spark/commit/abf2f7e45b60f15742bf04c47ad1ff6d0ab130dc). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29756: [SPARK-32885][SS] Add DataStreamReader.table API
cloud-fan commented on a change in pull request #29756: URL: https://github.com/apache/spark/pull/29756#discussion_r489182003 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -923,6 +925,19 @@ class Analyzer( case u: UnresolvedV2Relation => CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) + + case u @ UnresolvedRelation(_, extraOptions, true) => +val r = expandRelationName(u.multipartIdentifier) match { + case NonSessionCatalogAndIdentifier(catalog, ident) => +CatalogV2Util.loadTable(catalog, ident) match { + case Some(table) => +Some(StreamingRelationV2( Review comment: shall we just add a `isStreaming` flag in `lookupV2Relation`, to unify the code a bit more? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #29734: [SPARK-32861][SQL] GenerateExec should require column ordering
cloud-fan closed pull request #29734: URL: https://github.com/apache/spark/pull/29734 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29762: [SPARK-32892][CORE][SQL] Fix hash functions on big-endian platforms.
AmplabJenkins commented on pull request #29762: URL: https://github.com/apache/spark/pull/29762#issuecomment-693193767 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29766: [SPARK-32704][SQL][TESTS][FOLLOW-UP] Check any physical rule instead of a specific rule in the test
SparkQA removed a comment on pull request #29766: URL: https://github.com/apache/spark/pull/29766#issuecomment-693112608 **[Test build #128730 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128730/testReport)** for PR 29766 at commit [`80ac7d2`](https://github.com/apache/spark/commit/80ac7d25edd7a8570e9b666791aa7215256fb6d0). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29761: [SPARK-32889][SQL] orc table column name supports special characters.
cloud-fan commented on a change in pull request #29761: URL: https://github.com/apache/spark/pull/29761#discussion_r489184669 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala ## @@ -38,14 +38,19 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} private[sql] object OrcFileFormat { private def checkFieldName(name: String): Unit = { try { - TypeDescription.fromString(s"struct<$name:int>") + if (SQLConf.get.getConfString("spark.sql.orc.column.allowSpecialChar", "false").toBoolean) { Review comment: +1, I don't think this needs a config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29762: [SPARK-32892][CORE][SQL] Fix hash functions on big-endian platforms.
AmplabJenkins removed a comment on pull request #29762: URL: https://github.com/apache/spark/pull/29762#issuecomment-693193767 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29766: [SPARK-32704][SQL][TESTS][FOLLOW-UP] Check any physical rule instead of a specific rule in the test
SparkQA commented on pull request #29766: URL: https://github.com/apache/spark/pull/29766#issuecomment-693193823 **[Test build #128730 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128730/testReport)** for PR 29766 at commit [`80ac7d2`](https://github.com/apache/spark/commit/80ac7d25edd7a8570e9b666791aa7215256fb6d0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
SparkQA removed a comment on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693110435 **[Test build #128729 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128729/testReport)** for PR 29765 at commit [`89c2eda`](https://github.com/apache/spark/commit/89c2eda9787b3d904f536dd7c064a1b45324ed0a). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #29722: [SPARK-32850][CORE] Simplify the RPC message flow of decommission
Ngone51 commented on a change in pull request #29722: URL: https://github.com/apache/spark/pull/29722#discussion_r489186319 ## File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ## @@ -245,15 +245,24 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) -case WorkerDecommission(id, workerRef) => - logInfo("Recording worker %s decommissioning".format(id)) +case WorkerDecommissioning(id, workerRef) => if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { // We use foreach since get gives us an option and we can skip the failures. -idToWorker.get(id).foreach(decommissionWorker) +idToWorker.get(id).foreach(w => decommissionWorker(w)) } +case DecommissionWorkers(ids) => + ids.foreach ( id => Review comment: It has been checked by the caller when handling `DecommissionWorkersOnHosts`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489187346 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +/** + * The NthValue function returns the value of `input` at the `offset`th row from beginning of the + * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or + * there is no such an `offset`th row, null is returned. + */ +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If the value of `input` at the + `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the + offset is 10, size of the window frame less than 10), null is returned. + """, + arguments = """ +Arguments: + * input - the target column or expression that the function operates on. + * offset - an int expression which determines the row number relative to the first row in + the window for which to return the expression. The offset can be a constant or an + expression and must be a positive integer that is greater than 0. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) +extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!offsetExpr.foldable) { + TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.") +} else { + offsetExpr.dataType match { +case IntegerType | ShortType | ByteType => + offsetExpr.eval().asInstanceOf[Int] match { +case i: Int if i <= 0 => TypeCheckFailure( + s"The 'offset' argument of nth_value must be greater than zero but it is $i.") +case _ => TypeCheckSuccess + } +case _ => TypeCheckFailure( + s"The 'offset' parameter must be a int literal but it is ${offsetExpr.dataType}.") + } +} + } + + private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong + private lazy val result = AttributeReference("result", input.dataType)() + private lazy val count = AttributeReference("count", LongType)() + private lazy val valueSet = AttributeReference("valueSet", BooleanType)() + override lazy val aggBufferAttributes: Seq[AttributeReference] = +result :: count :: valueSet :: Nil + + override lazy val initialValues: Seq[Literal] = Seq( +/* result = */ Literal.create(null, input.dataType), +/* count = */ Literal(1L), +/* valueSet = */ Literal.create(false, BooleanType) Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #29722: [SPARK-32850][CORE] Simplify the RPC message flow of decommission
cloud-fan commented on a change in pull request #29722: URL: https://github.com/apache/spark/pull/29722#discussion_r489187271 ## File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ## @@ -245,15 +245,24 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) -case WorkerDecommission(id, workerRef) => - logInfo("Recording worker %s decommissioning".format(id)) +case WorkerDecommissioning(id, workerRef) => if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { // We use foreach since get gives us an option and we can skip the failures. -idToWorker.get(id).foreach(decommissionWorker) +idToWorker.get(id).foreach(w => decommissionWorker(w)) } +case DecommissionWorkers(ids) => + ids.foreach ( id => Review comment: ah i see This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
AmplabJenkins removed a comment on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693196447 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function
beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489187186 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +/** + * The NthValue function returns the value of `input` at the `offset`th row from beginning of the + * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or + * there is no such an `offset`th row, null is returned. + */ +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If the value of `input` at the + `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the + offset is 10, size of the window frame less than 10), null is returned. + """, + arguments = """ +Arguments: + * input - the target column or expression that the function operates on. + * offset - an int expression which determines the row number relative to the first row in + the window for which to return the expression. The offset can be a constant or an + expression and must be a positive integer that is greater than 0. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) +extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { +val check = super.checkInputDataTypes() +if (check.isFailure) { + check +} else if (!offsetExpr.foldable) { + TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.") +} else { + offsetExpr.dataType match { +case IntegerType | ShortType | ByteType => + offsetExpr.eval().asInstanceOf[Int] match { +case i: Int if i <= 0 => TypeCheckFailure( + s"The 'offset' argument of nth_value must be greater than zero but it is $i.") +case _ => TypeCheckSuccess + } +case _ => TypeCheckFailure( + s"The 'offset' parameter must be a int literal but it is ${offsetExpr.dataType}.") + } +} + } + + private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong + private lazy val result = AttributeReference("result", input.dataType)() + private lazy val count = AttributeReference("count", LongType)() + private lazy val valueSet = AttributeReference("valueSet", BooleanType)() + override lazy val aggBufferAttributes: Seq[AttributeReference] = +result :: count :: valueSet :: Nil + + override lazy val initialValues: Seq[Literal] = Seq( +/* result = */ Literal.create(null, input.dataType), +/* count = */ Literal(1L), +/* valueSet = */ Literal.create(false, BooleanType) + ) + + override lazy val updateExpressions: Seq[Expression] = { +if (ignoreNulls) { + Seq( +/* result = */ If(valueSet || input.isNull || count < offset, result, input), +/* count = */ If(input.isNull, count, count + 1L), +/* valueSet = */ valueSet || (input.isNotNull && count >= offset) + ) +} else { + Seq( +/* result = */ If(valueSet || count < offset, result, input), +/* count = */ count + 1L, +/* valueSet = */ valueSet || count >= offset Review comment: We use ``` /* result = */ If(count === offset, input, result), /* count = */ count + 1L ``` now. ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ## @@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +/** + * The NthValue function returns the value of `input` at the `offset`th row from beginning of the + * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or + * there is no such an `offset`th row, null is returned. + */ +@ExpressionDescription( + usage = """ +_FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If the value of
[GitHub] [spark] cloud-fan commented on a change in pull request #29722: [SPARK-32850][CORE] Simplify the RPC message flow of decommission
cloud-fan commented on a change in pull request #29722: URL: https://github.com/apache/spark/pull/29722#discussion_r489186794 ## File path: core/src/main/scala/org/apache/spark/deploy/master/Master.scala ## @@ -245,15 +245,24 @@ private[deploy] class Master( logError("Leadership has been revoked -- master shutting down.") System.exit(0) -case WorkerDecommission(id, workerRef) => - logInfo("Recording worker %s decommissioning".format(id)) +case WorkerDecommissioning(id, workerRef) => if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else { // We use foreach since get gives us an option and we can skip the failures. -idToWorker.get(id).foreach(decommissionWorker) +idToWorker.get(id).foreach(w => decommissionWorker(w)) } +case DecommissionWorkers(ids) => + ids.foreach ( id => Review comment: then probably it's better to add an `assert(state != RecoveryState.STANDBY)` here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
HeartSaVioR commented on a change in pull request #25965: URL: https://github.com/apache/spark/pull/25965#discussion_r489186932 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ## @@ -585,7 +585,8 @@ class MicroBatchExecution( withProgressLocked { sinkCommitProgress = batchSinkProgress watermarkTracker.updateWatermark(lastExecution.executedPlan) - commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) + assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)), +s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") Review comment: Good point. Addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
AmplabJenkins commented on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693196447 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
HyukjinKwon commented on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693204275 I think .. Jenkins currently doesn't run PySpark documentation due to missing dependencies. I opened some JIRAs but looks like Shane is busy at this moment. GitHub Actions is the only build that build PySpark docs so that might be the reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #29756: [SPARK-32885][SS] Add DataStreamReader.table API
viirya commented on a change in pull request #29756: URL: https://github.com/apache/spark/pull/29756#discussion_r489195938 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ## @@ -942,6 +957,47 @@ class Analyzer( } } + /** + * Replace [[UnresolvedRelation]] with concrete streaming logical plans. + */ + object ResolveStreamingRelation extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case u: UnresolvedRelation if u.isStreaming => +val res = lookupStreamingRelation(u.multipartIdentifier, u.options) +res.getOrElse(u) +} + +// Look up a relation from the session catalog with the following logic: +// 1) If the resolved catalog is not session catalog, return None. +// 2) If a relation is not found in the catalog, return None. +// 3) If a v1 table is found, create a v1 relation. Otherwise, pass the table to +//UnresolvedStreamingRelation. Review comment: hmm, I don't see `UnresolvedStreamingRelation`, is it missing? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
viirya commented on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693209330 Oh, got it. I'm fixing it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29765: [SPARK-32888][DOCS] Add user document about header flag and RDD as path for reading CSV
SparkQA removed a comment on pull request #29765: URL: https://github.com/apache/spark/pull/29765#issuecomment-693213988 **[Test build #128747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128747/testReport)** for PR 29765 at commit [`19ee398`](https://github.com/apache/spark/commit/19ee3987d7a88af943dcb39c7ba22e83573521b8). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #28841: [SPARK-31962][SQL] Provide modifiedAfter and modifiedBefore options when filtering from a batch-based file data source
AmplabJenkins removed a comment on pull request #28841: URL: https://github.com/apache/spark/pull/28841#issuecomment-693216812 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29591: [SPARK-32714][PYTHON] Initial pyspark-stubs port.
SparkQA removed a comment on pull request #29591: URL: https://github.com/apache/spark/pull/29591#issuecomment-693148274 **[Test build #128740 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128740/testReport)** for PR 29591 at commit [`172f99a`](https://github.com/apache/spark/commit/172f99a3de38294c5f7a41340a587ca25cd29d4f). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29767: [SPARK-32896][SS] Add DataStreamWriter.table API
SparkQA removed a comment on pull request #29767: URL: https://github.com/apache/spark/pull/29767#issuecomment-693148256 **[Test build #128739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128739/testReport)** for PR 29767 at commit [`f557696`](https://github.com/apache/spark/commit/f5576969ee9afeaa35c3bb74822aad43cf9c11c2). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29739: [SPARK-32867][SQL] When explain, HiveTableRelation show limited message
SparkQA removed a comment on pull request #29739: URL: https://github.com/apache/spark/pull/29739#issuecomment-693179376 **[Test build #128743 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128743/testReport)** for PR 29739 at commit [`883f547`](https://github.com/apache/spark/commit/883f5471ec653cd848f18c9180cb7fca6b90eb51). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29316: [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
SparkQA removed a comment on pull request #29316: URL: https://github.com/apache/spark/pull/29316#issuecomment-693135193 **[Test build #128735 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128735/testReport)** for PR 29316 at commit [`c8ac369`](https://github.com/apache/spark/commit/c8ac36917caf7fa67a0bca3c30a10380fd7c7306). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
AmplabJenkins removed a comment on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693216851 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA removed a comment on pull request #29586: [SPARK-32743][SQL] Add distinct info at UnresolvedFunction toString
SparkQA removed a comment on pull request #29586: URL: https://github.com/apache/spark/pull/29586#issuecomment-693176982 **[Test build #128741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128741/testReport)** for PR 29586 at commit [`95a3e02`](https://github.com/apache/spark/commit/95a3e028b2d1c83eaeab4baf15c07870df4a66ad). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #25965: [SPARK-26425][SS] Add more constraint checks to avoid checkpoint corruption
SparkQA commented on pull request #25965: URL: https://github.com/apache/spark/pull/25965#issuecomment-693216733 **[Test build #128745 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128745/testReport)** for PR 25965 at commit [`a885212`](https://github.com/apache/spark/commit/a8852123ae978dddb5578e3411b4822fd4a1fdce). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29316: [SPARK-32508][SQL] Disallow empty part col values in partition spec before static partition writing
SparkQA commented on pull request #29316: URL: https://github.com/apache/spark/pull/29316#issuecomment-693216736 **[Test build #128735 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128735/testReport)** for PR 29316 at commit [`c8ac369`](https://github.com/apache/spark/commit/c8ac36917caf7fa67a0bca3c30a10380fd7c7306). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29739: [SPARK-32867][SQL] When explain, HiveTableRelation show limited message
SparkQA commented on pull request #29739: URL: https://github.com/apache/spark/pull/29739#issuecomment-693216722 **[Test build #128743 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128743/testReport)** for PR 29739 at commit [`883f547`](https://github.com/apache/spark/commit/883f5471ec653cd848f18c9180cb7fca6b90eb51). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29585: [SPARK-32741][SQL] Check if the same ExprId refers to the unique attribute in logical plans
SparkQA commented on pull request #29585: URL: https://github.com/apache/spark/pull/29585#issuecomment-693216740 **[Test build #128734 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128734/testReport)** for PR 29585 at commit [`fb048c7`](https://github.com/apache/spark/commit/fb048c7afcc6e366f8e840a3e0a02849a129a04c). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] SparkQA commented on pull request #29591: [SPARK-32714][PYTHON] Initial pyspark-stubs port.
SparkQA commented on pull request #29591: URL: https://github.com/apache/spark/pull/29591#issuecomment-693216732 **[Test build #128740 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128740/testReport)** for PR 29591 at commit [`172f99a`](https://github.com/apache/spark/commit/172f99a3de38294c5f7a41340a587ca25cd29d4f). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org