[GitHub] [spark] aminebag commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak
aminebag commented on PR #41423: URL: https://github.com/apache/spark/pull/41423#issuecomment-1575691248 @srowen > Are you saying that these objects are not actually usable and will never be collected because events are dropped? Yes, that's exactly what I'm saying. For example, many LiveTask objects (corresponding to tasks that are already finished) can remain in memory forever while having no use. > it would be to clean up this state even when something is dropped How can we clean the state if the event onTaskEnd is lost ? Is there anyway to determine whether a task is finished if we have missed the onTaskEnd event ? If there's, this could be the most appropriate way to clean up. > It's just a hack that happens to avoid this in your setup by potentially reporting incorrect info. Yes, with this fix we would potentially report incorrect info, but it's already the case without the fix. When we miss an onTaskEnd event we report that the task is still running but actually it's not. The purpose of the fix is not to correctly report monitoring information. The purpose of the fix is just to contain the leak. It's better to have incorrect monitoring data (again, it's already the case) than to make the application unstable. > That's why I'm also wondering if there are simply ways to make the hot path in this listener faster, to keep up and not drop stuff in the first place. Sure, I think that it's a good idea to make the listener faster and I'm open to any ideas to achieve that. Still, no matter how much we make the listener faster, we can never guarantee that it'll always be able to keep up with the rate of incoming events. I think that it's not sufficient to just improve the listener and just hope that it'll never be swamped by events. In my opinion, the listener should be able to survive any flood of events without leaving a leak behind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41440: [SPARK-43952][CORE][CONNECT] Add SparkContext APIs for query cancellation by tag
HyukjinKwon commented on code in PR #41440: URL: https://github.com/apache/spark/pull/41440#discussion_r1217271474 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -2851,6 +2907,14 @@ object SparkContext extends Logging { */ private[spark] val DRIVER_IDENTIFIER = "driver" + /** Separator of tags in SPARK_JOB_TAGS property */ + private[spark] val SPARK_JOB_TAGS_SEP = "," + + private[spark] def throwIfInvalidTagName(tagName: String) = { +if (tagName.contains(SPARK_JOB_TAGS_SEP)) { + throw new IllegalArgumentException("Spark job tag cannot contain '$SPARK_JOB_TAG_NAME_SEP'.") Review Comment: ```suggestion throw new IllegalArgumentException(s"Spark job tag cannot contain '$SPARK_JOB_TAG_NAME_SEP'.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] siying commented on pull request #41409: [SPARK-43901][SQL] Avro to Support custom decimal type backed by Long
siying commented on PR #41409: URL: https://github.com/apache/spark/pull/41409#issuecomment-1575861442 @dongjoon-hyun done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216255732 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.roaringbitmap.longlong.Roaring64Bitmap + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.BasePredicate +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.Projection +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, Keep, ROW_ID, Split} +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.UnaryExecNode + +case class MergeRowsExec( +isSourceRowPresent: Expression, +isTargetRowPresent: Expression, +matchedInstructions: Seq[Instruction], +notMatchedInstructions: Seq[Instruction], +notMatchedBySourceInstructions: Seq[Instruction], +checkCardinality: Boolean, +output: Seq[Attribute], +child: SparkPlan) extends UnaryExecNode { + + @transient override lazy val producedAttributes: AttributeSet = { +AttributeSet(output.filterNot(attr => inputSet.contains(attr))) + } + + @transient override lazy val references: AttributeSet = child.outputSet + + override def simpleString(maxFields: Int): String = { +s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}" + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { +copy(child = newChild) + } + + protected override def doExecute(): RDD[InternalRow] = { +child.execute().mapPartitions(processPartition) + } + + private def processPartition(rowIterator: Iterator[InternalRow]): Iterator[InternalRow] = { +val isSourceRowPresentPred = createPredicate(isSourceRowPresent) +val isTargetRowPresentPred = createPredicate(isTargetRowPresent) + +val matchedInstructionExecs = planInstructions(matchedInstructions) +val notMatchedInstructionExecs = planInstructions(notMatchedInstructions) +val notMatchedBySourceInstructionExecs = planInstructions(notMatchedBySourceInstructions) + +val cardinalityValidator = if (checkCardinality) { + val rowIdOrdinal = child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID)) + assert(rowIdOrdinal != -1, "Cannot find row ID attr") + BitmapCardinalityValidator(rowIdOrdinal) +} else { + NoopCardinalityValidator +} + +val mergeIterator = new MergeRowIterator( + rowIterator, cardinalityValidator, isTargetRowPresentPred, isSourceRowPresentPred, + matchedInstructionExecs, notMatchedInstructionExecs, notMatchedBySourceInstructionExecs) + +// null indicates a record must be discarded +mergeIterator.filter(_ != null) + } + + private def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +UnsafeProjection.create(exprs, child.output) + } + + private def createPredicate(expr: Expression): BasePredicate = { +GeneratePredicate.generate(expr, child.output) + } + + private def planInstructions(instructions: Seq[Instruction]): Seq[InstructionExec] = { +instructions.map { + case Keep(cond, output) => +KeepExec(createPredicate(cond), createProjection(output)) + case Split(cond, output, otherOutput) => +SplitExec(createPredicate(cond), createProjection(output), createProjection(otherOutput)) + case other => +throw new AnalysisException(s"Unexpected instruction: $other") +} + } + + sealed trait InstructionExec { +def condition: BasePredicate + } + + case class
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216253601 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.types.DataType + +case class MergeRows( +isSourceRowPresent: Expression, +isTargetRowPresent: Expression, +matchedInstructions: Seq[Instruction], +notMatchedInstructions: Seq[Instruction], +notMatchedBySourceInstructions: Seq[Instruction], +checkCardinality: Boolean, +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + + override lazy val producedAttributes: AttributeSet = { +AttributeSet(output.filterNot(attr => inputSet.contains(attr))) + } + + override lazy val references: AttributeSet = child.outputSet + + override def simpleString(maxFields: Int): String = { +s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}" + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { +copy(child = newChild) + } +} + +object MergeRows { + final val ROW_ID = "__row_id" + + sealed trait Instruction extends Expression with Unevaluable { Review Comment: Each `MergeAction` gets converted into a particular instance of `Instruction`. Each such instruction is a valid expression so that they will be properly transformed by the analyzer and optimizer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LorenzoMartini commented on pull request #40018: [SPARK-42439][SQL] In v2 writes, make createJobDescription in FileWrite.toBatch not lazy
LorenzoMartini commented on PR #40018: URL: https://github.com/apache/spark/pull/40018#issuecomment-1575674518 Maybe @MaxGekk ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly
HyukjinKwon commented on PR #41450: URL: https://github.com/apache/spark/pull/41450#issuecomment-1575790826 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly
HyukjinKwon closed pull request #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly URL: https://github.com/apache/spark/pull/41450 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1217358049 ## sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala: ## @@ -92,6 +93,67 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan operators.head } + test("NO_BROADCAST_AND_REPLICATION hint is respected in cross joins") { Review Comment: These 3 tests cover scenarios when it is not safe to broadcast or replicate the target table to perform the cardinality check. The newly added internal hint handles this. There are MERGE tests for this too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic opened a new pull request, #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly
itholic opened a new pull request, #41450: URL: https://github.com/apache/spark/pull/41450 ### What changes were proposed in this pull request? This PR proposes to fix test which is not tested properly. ### Why are the changes needed? To test properly. ### Does this PR introduce _any_ user-facing change? No, it's test only ### How was this patch tested? The existing CI should pass -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aminebag commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak
aminebag commented on PR #41423: URL: https://github.com/apache/spark/pull/41423#issuecomment-1575506059 @srowen I have applied and tested the modification you have suggested (replace toList with toArray) and it had no observable impact on the issue. I can still include it in the change If you want. @sarutak Increasing the value of `spark.scheduler.listenerbus.eventqueue.capacity` would reduce the severity of the leak but I don't think that it would entirely prevent it from happening, especially at moments of peaks of activity. I suppose we would have less event drops by increasing the size of the queue, but tasks would still leak and keep adding-up in memory until eventually there's no more space for the driver to operate. What I have noticed with the heap dumps in both the cases of our application in production and the tests, is that the most significant leak is the liveTasks map of the ApplicationStatusListener object. As I have illustrated using the tests, the leak was contained by limiting the size of the liveTasks map. I think that this fix addresses the problem directly, since no matter how many events are dropped, the number of leaked tasks will never exceed the value of `spark.ui.refreshedTasks`. Thus, the application can live for days and we would never worry about liveTasks map taking over all memory. Especially that the role of AppStatusListener is just to provide monitoring information about the state of the application, and when you start to have event drops, AppStatusListener's state is already incoherent. This new property `spark.ui.refreshedTasks` is optional and disabled by default and we would only need to enable it when we observe that the AppStatusListener is leaking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bjornjorgensen commented on pull request #40420: [SPARK-42617][PS] Support `isocalendar` from the pandas 2.0.0
bjornjorgensen commented on PR #40420: URL: https://github.com/apache/spark/pull/40420#issuecomment-1575528446 @dzhigimont we have upgraded main branch to pandas 2.0.2 now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases
HyukjinKwon closed pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases URL: https://github.com/apache/spark/pull/41421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases
HyukjinKwon commented on PR #41421: URL: https://github.com/apache/spark/pull/41421#issuecomment-1575788667 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41415: [SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts
HyukjinKwon commented on code in PR #41415: URL: https://github.com/apache/spark/pull/41415#discussion_r1217286686 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala: ## @@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] { val canonicalUri = fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri) sessionHolder.session.sparkContext.addArchive(canonicalUri.toString) + } else if (remoteRelativePath.startsWith(s"files${File.separator}")) { +sessionHolder.session.sparkContext.addFile(target.toString) Review Comment: For regular files and archives, I don't intend to expose `org.apache.spark.SparkFiles` for now. Since the files are archives are always stored at the current working directory of executors in production, I was simply thinking about creating a session dedicated directory, and change the current working directory to that. Meaning that the end users would continue accessing to their file with `./myfile.txt` or `./myarchive`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41415: [SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts
HyukjinKwon commented on code in PR #41415: URL: https://github.com/apache/spark/pull/41415#discussion_r1217286896 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala: ## @@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] { val canonicalUri = fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri) sessionHolder.session.sparkContext.addArchive(canonicalUri.toString) + } else if (remoteRelativePath.startsWith(s"files${File.separator}")) { +sessionHolder.session.sparkContext.addFile(target.toString) Review Comment: (`SparkFiles` is being used in the test case here but that's a sort of hack to make sure of cleaning up, etc.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases
beliefer commented on PR #41421: URL: https://github.com/apache/spark/pull/41421#issuecomment-1575891405 @HyukjinKwon Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
beliefer commented on code in PR #41444: URL: https://github.com/apache/spark/pull/41444#discussion_r1217334172 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -812,6 +812,69 @@ object functions { */ def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord) + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column): Column = Column.fn("percentile", e, percentage) + + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column, frequency: Column): Column = +Column.fn("percentile", e, percentage, frequency) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column): Column = +Column.fn("percentile_cont", e, percentage) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column = +Column.fn("percentile_cont", e, percentage, lit(reverse)) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column): Column = +Column.fn("percentile_disc", e, percentage) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column = Review Comment: I know that. But there are exists many same cases. such as: https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L473 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L489 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L506 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L621 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L633 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L686 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L702 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1093 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1164 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1179 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2745 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2759 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L3498
[GitHub] [spark] wForget commented on pull request #41332: [SPARK-43801][SQL] Support unwrap date type to string type in UnwrapCastInBinaryComparison
wForget commented on PR #41332: URL: https://github.com/apache/spark/pull/41332#issuecomment-1575952608 I found the same problem in hive data source, the partition filter cannot be pushed down due to the cast expression. Can we push down date type filter in `Shim_v0_13#getPartitionsByFilter`? In addition, is it possible to use unquoted date string to pass date filter? Hive metastore will parse unquoted date string into a date type. Refer to https://github.com/apache/hive/blob/7cd3107a76d633ef5fae2ffb8ec16953ac968092/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/Filter.g#L546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #41136: [SPARK-43356][K8S] Migrate deprecated createOrReplace to serverSideApply
dongjoon-hyun commented on PR #41136: URL: https://github.com/apache/spark/pull/41136#issuecomment-1575607354 We are now using `6.7.0`. Could you rebase it to the master branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #41191: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
gengliangwang commented on code in PR #41191: URL: https://github.com/apache/spark/pull/41191#discussion_r1217103535 ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala: ## @@ -158,30 +158,37 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. -case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) => +case c @ CreateTable(ResolvedV1Identifier(ident), _, _, u: UnresolvedTableSpec, _, Review Comment: Nit: shall we have a single rule for resolving the table spec? This rule is for resolving session catalogs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #41440: [SPARK-43952][CORE][CONNECT] Add SparkContext APIs for query cancellation by tag
HyukjinKwon commented on PR #41440: URL: https://github.com/apache/spark/pull/41440#issuecomment-1575805298 > If we don't want to add public APIs like that, I'm also fine with having all that as private[spark]; my planned use of it is inside Spark in Spark Connect. I am fine with adding them as a public API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #41191: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
gengliangwang commented on code in PR #41191: URL: https://github.com/apache/spark/pull/41191#discussion_r1217279661 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala: ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal} +import org.apache.spark.sql.catalyst.optimizer.ConstantFolding +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.errors.QueryCompilationErrors + +/** + * This object is responsible for processing unresolved table specifications in commands with + * OPTIONS lists. The parser produces such lists as maps from strings to unresolved expressions. + * After otherwise resolving such expressions in the analyzer, here we convert them to resolved + * table specifications wherein these OPTIONS list values are represented as strings instead, for + * convenience. + */ +object ResolveTableSpec { + def apply(u: UnresolvedTableSpec, opts: UnresolvedOptionsList): ResolvedTableSpec = { +val newOptions: Seq[(String, String)] = opts.options.map { + case (key: String, null) => +(key, null) + case (key: String, value: Expression) => +val newValue: String = try { + constantFold(value) match { Review Comment: Shall we just use `value.eval()` since it is foldable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request, #41452: [DO-NOT-MERGE] Testing revert 1
HyukjinKwon opened a new pull request, #41452: URL: https://github.com/apache/spark/pull/41452 ### What changes were proposed in this pull request? TBD ### Why are the changes needed? TBD ### Does this PR introduce _any_ user-facing change? TBD ### How was this patch tested? TBD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request, #41453: [DO-NOT-MERGE] Testing revert 2
HyukjinKwon opened a new pull request, #41453: URL: https://github.com/apache/spark/pull/41453 ### What changes were proposed in this pull request? TBD ### Why are the changes needed? TBD ### Does this PR introduce _any_ user-facing change? TBD ### How was this patch tested? TBD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #41385: [SPARK-43205][SQL][FOLLOWUP] add ExpressionWithUnresolvedIdentifier to simplify code
gengliangwang commented on code in PR #41385: URL: https://github.com/apache/spark/pull/41385#discussion_r1217279071 ## sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-disabled.sql.out: ## @@ -218,8 +218,8 @@ org.apache.spark.sql.AnalysisException "objectType" : "", "objectName" : "", "startIndex" : 15, -"stopIndex" : 37, -"fragment" : "not_exist AS X(`hello`)" +"stopIndex" : 23, Review Comment: Awesome, the context is accurate now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a diff in pull request #41370: [SPARK-43866] Partition filter condition should pushed down to metastore query if it is equivalence Predicate
wangyum commented on code in PR #41370: URL: https://github.com/apache/spark/pull/41370#discussion_r1217318011 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala: ## @@ -1041,6 +1053,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name))) => Some(s"$value ${op.symbol} $name") + case EqualTo(ExtractStringAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => +Some(s"$name = $value") + Review Comment: A test like this: ```scala test("SPARK-43866:xxx") { withTable("test_tb") { sql("CREATE TABLE test_tb (id int) PARTITIONED BY (dt string)") sql("insert into test_tb partition(dt = '20230505.0') select 1") val df = sql("select * from test_tb where dt=20230505") checkAnswer(df, Row(1, "20230505.0")) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216251279 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, Expression, IsNotNull, MetadataAttribute, MonotonicallyIncreasingID, OuterReference, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType, LeftAnti, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project, UpdateAction, WriteDelta} +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, Keep, ROW_ID, Split} +import org.apache.spark.sql.catalyst.util.RowDeltaUtils.OPERATION_COLUMN +import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations +import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} +import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * A rule that rewrites MERGE operations using plans that operate on individual or groups of rows. + * + * This rule assumes the commands have been fully resolved and all assignments have been aligned. + */ +object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper { + + private final val ROW_FROM_SOURCE = "__row_from_source" + private final val ROW_FROM_TARGET = "__row_from_target" + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, +notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned && +matchedActions.isEmpty && notMatchedActions.size == 1 && +notMatchedBySourceActions.isEmpty => + + EliminateSubqueryAliases(aliasedTable) match { +case r: DataSourceV2Relation => + // NOT MATCHED conditions may only refer to columns in source so they can be pushed down + val insertAction = notMatchedActions.head.asInstanceOf[InsertAction] + val filteredSource = insertAction.condition match { +case Some(insertCond) => Filter(insertCond, source) +case None => source + } + + // there is only one NOT MATCHED action, use a left anti join to remove any matching rows + // and switch to using a regular append instead of a row-level MERGE operation + // only unmatched source rows that match the condition are appended to the table + val joinPlan = Join(filteredSource, r, LeftAnti, Some(cond), JoinHint.NONE) + + val output = insertAction.assignments.map(_.value) + val outputColNames = r.output.map(_.name) + val projectList = output.zip(outputColNames).map { case (expr, name) => +Alias(expr, name)() + } + val project = Project(projectList, joinPlan) + + AppendData.byPosition(r, project) + +case _ => + m + } + +case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, Review Comment: This is a special case when there are only NOT MATCHED actions (having just 1 such action is handled above). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216251279 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, Expression, IsNotNull, MetadataAttribute, MonotonicallyIncreasingID, OuterReference, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType, LeftAnti, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project, UpdateAction, WriteDelta} +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, Keep, ROW_ID, Split} +import org.apache.spark.sql.catalyst.util.RowDeltaUtils.OPERATION_COLUMN +import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations +import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta} +import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +/** + * A rule that rewrites MERGE operations using plans that operate on individual or groups of rows. + * + * This rule assumes the commands have been fully resolved and all assignments have been aligned. + */ +object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper { + + private final val ROW_FROM_SOURCE = "__row_from_source" + private final val ROW_FROM_TARGET = "__row_from_target" + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { +case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, +notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned && +matchedActions.isEmpty && notMatchedActions.size == 1 && +notMatchedBySourceActions.isEmpty => + + EliminateSubqueryAliases(aliasedTable) match { +case r: DataSourceV2Relation => + // NOT MATCHED conditions may only refer to columns in source so they can be pushed down + val insertAction = notMatchedActions.head.asInstanceOf[InsertAction] + val filteredSource = insertAction.condition match { +case Some(insertCond) => Filter(insertCond, source) +case None => source + } + + // there is only one NOT MATCHED action, use a left anti join to remove any matching rows + // and switch to using a regular append instead of a row-level MERGE operation + // only unmatched source rows that match the condition are appended to the table + val joinPlan = Join(filteredSource, r, LeftAnti, Some(cond), JoinHint.NONE) + + val output = insertAction.assignments.map(_.value) + val outputColNames = r.output.map(_.name) + val projectList = output.zip(outputColNames).map { case (expr, name) => +Alias(expr, name)() + } + val project = Project(projectList, joinPlan) + + AppendData.byPosition(r, project) + +case _ => + m + } + +case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, notMatchedActions, Review Comment: This is a special case when there are only NOT MATCHED actions (just 1 is handled above). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] aminebag commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak
aminebag commented on PR #41423: URL: https://github.com/apache/spark/pull/41423#issuecomment-1575726835 I don't think the answer is more resources. If we had more memory or CPU power we would just delay the issue we wouldn't prevent it. Also, if we had more CPU power the listener would become faster at consuming events, but the producer as well could become faster at producing them. Besides, increasing resources for the driver is vertical scaling which, I believe, is against the spirit of Spark and distributed computing. One solution would be to make the events queue blocking and never drop events (block until there's space in the queue to put the event). But this would mean that the scheduler (the event producer) would be impacted by any slowness of the listener. I don't think that it's a good idea to have a secondary feature (application monitoriong) impact the performance of a main feature (task scheduling). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41444: [WIP][SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
HyukjinKwon commented on code in PR #41444: URL: https://github.com/apache/spark/pull/41444#discussion_r1217278535 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -812,6 +812,69 @@ object functions { */ def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord) + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column): Column = Column.fn("percentile", e, percentage) + + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column, frequency: Column): Column = +Column.fn("percentile", e, percentage, frequency) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column): Column = +Column.fn("percentile_cont", e, percentage) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column = +Column.fn("percentile_cont", e, percentage, lit(reverse)) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column): Column = +Column.fn("percentile_disc", e, percentage) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column = Review Comment: Can we have them as all with `Column` signature? See also the comments on the top of this file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #41385: [SPARK-43205][SQL][FOLLOWUP] add ExpressionWithUnresolvedIdentifier to simplify code
gengliangwang commented on code in PR #41385: URL: https://github.com/apache/spark/pull/41385#discussion_r1217278997 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala: ## @@ -18,39 +18,51 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER import org.apache.spark.sql.types.StringType /** - * Resolves the catalog of the name parts for table/view/function/namespace. + * Resolves the identifier expressions and builds the original plans/expressions. */ -object IdentifierClauseUtil { - private def getNotNullFoldableString(clauseName: String, expr: Expression): String = { +object ResolveIdentifierClause extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( +_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { +case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved => + p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr)) +case other => + other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) { +case e: ExpressionWithUnresolvedIdentifier if e.identifierExpr.resolved => + e.exprBuilder.apply(evalIdentifierExpr(e.identifierExpr)) + } + } + + private def evalIdentifierExpr(expr: Expression): Seq[String] = { expr match { case e if !e.foldable => expr.failAnalysis( errorClass = "NOT_A_CONSTANT_STRING.NOT_CONSTANT", messageParameters = Map( - "name" -> clauseName, + "name" -> "IDENTIFIER", "expr" -> expr.sql)) case e if e.dataType != StringType => expr.failAnalysis( errorClass = "NOT_A_CONSTANT_STRING.WRONG_TYPE", messageParameters = Map( - "name" -> clauseName, + "name" -> "IDENTIFIER", "expr" -> expr.sql, "dataType" -> e.dataType.catalogString)) case e => e.eval() match { case null => expr.failAnalysis( errorClass = "NOT_A_CONSTANT_STRING.NULL", messageParameters = Map( - "name" -> clauseName, + "name" -> "IDENTIFIER", "expr" -> expr.sql)) - case other => other.toString // OK + case other => +// Parse the identifier string to name parts. +UnresolvedAttribute(other.toString).nameParts Review Comment: This one is a bit confusing. Shall we just use ``` CatalystSqlParser.parseMultipartIdentifier(name) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41415: [SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts
HyukjinKwon commented on code in PR #41415: URL: https://github.com/apache/spark/pull/41415#discussion_r1217286686 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala: ## @@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] { val canonicalUri = fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri) sessionHolder.session.sparkContext.addArchive(canonicalUri.toString) + } else if (remoteRelativePath.startsWith(s"files${File.separator}")) { +sessionHolder.session.sparkContext.addFile(target.toString) Review Comment: For regular files and archives, I don't intend to expose `org.apache.spark.SparkFiles` for now. Since the files are archives are always stored at the current working directory of executors in production, I was simply thinking about creating a session dedicated directory, and change the current working directory to that (during Python UDF execution). Meaning that the end users would continue accessing to their file with `./myfile.txt` or `./myarchive`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
HyukjinKwon commented on code in PR #41444: URL: https://github.com/apache/spark/pull/41444#discussion_r1217343932 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -812,6 +812,69 @@ object functions { */ def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord) + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column): Column = Column.fn("percentile", e, percentage) + + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column, frequency: Column): Column = +Column.fn("percentile", e, percentage, frequency) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column): Column = +Column.fn("percentile_cont", e, percentage) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column = +Column.fn("percentile_cont", e, percentage, lit(reverse)) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column): Column = +Column.fn("percentile_disc", e, percentage) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column = Review Comment: They are mostly there because of the legacy reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult
ivoson commented on code in PR #40610: URL: https://github.com/apache/spark/pull/40610#discussion_r1216559940 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala: ## @@ -46,7 +46,14 @@ private[sql] class SparkResult[T]( private[this] var numRecords: Int = 0 private[this] var structType: StructType = _ private[this] var boundEncoder: ExpressionEncoder[T] = _ - private[this] val batches = mutable.Buffer.empty[ColumnarBatch] + private[this] var nextBatchIndex: Int = 0 + private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch] + + // Exposed for UT. + private[sql] def existingBatches(): Seq[ColumnarBatch] = { Review Comment: thanks, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41424: [SPARK-43913][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2426-2432]
beliefer commented on code in PR #41424: URL: https://github.com/apache/spark/pull/41424#discussion_r1216524958 ## core/src/main/resources/error/error-classes.json: ## @@ -1834,6 +1849,11 @@ ], "sqlState" : "42K05" }, + "RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT" : { +"message" : [ + "" Review Comment: Got it. But it seems not about `UNRESOLVED_COLUMN`. How about `MISSING_ATTRIBUTES` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #41451: [SPARK-43948][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[0050|0057|0058|0059]
panbingkun commented on PR #41451: URL: https://github.com/apache/spark/pull/41451#issuecomment-1575582851 cc @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
beliefer commented on code in PR #41444: URL: https://github.com/apache/spark/pull/41444#discussion_r1217345052 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -812,6 +812,69 @@ object functions { */ def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord) + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column): Column = Column.fn("percentile", e, percentage) + + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column, frequency: Column): Column = +Column.fn("percentile", e, percentage, frequency) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column): Column = +Column.fn("percentile_cont", e, percentage) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column = +Column.fn("percentile_cont", e, percentage, lit(reverse)) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column): Column = +Column.fn("percentile_disc", e, percentage) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column = Review Comment: Got it. Let me update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41424: [SPARK-43913][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2426-2432]
beliefer commented on code in PR #41424: URL: https://github.com/apache/spark/pull/41424#discussion_r1216524958 ## core/src/main/resources/error/error-classes.json: ## @@ -1834,6 +1849,11 @@ ], "sqlState" : "42K05" }, + "RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT" : { +"message" : [ + "" Review Comment: Got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #41191: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values
gengliangwang commented on code in PR #41191: URL: https://github.com/apache/spark/pull/41191#discussion_r1217099067 ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala: ## @@ -158,30 +158,37 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the // session catalog and the table provider is not v2. -case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) => +case c @ CreateTable(ResolvedV1Identifier(ident), _, _, u: UnresolvedTableSpec, _, Review Comment: QQ: why separate UnresolvedOptionsList from UnresolvedTableSpec? Now the rule has to match both UnresolvedTableSpec and UnresolvedOptionsList. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #41444: [WIP][SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
beliefer commented on PR #41444: URL: https://github.com/apache/spark/pull/41444#issuecomment-1575920159 ping @cloud-fan @HyukjinKwon @zhengruifeng The GA failure is unrelated to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun opened a new pull request, #41451: [SPARK-43948][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[0050|0057|0058|0059]
panbingkun opened a new pull request, #41451: URL: https://github.com/apache/spark/pull/41451 ### What changes were proposed in this pull request? The pr aims to assign names to the error class `_LEGACY_ERROR_TEMP_[0050|0057|0058|0059]`, details as follows: - _LEGACY_ERROR_TEMP_0050 => LOCAL_MUST_WITH_SCHEMA_FILE - _LEGACY_ERROR_TEMP_0057 => UNSUPPORTED_DEFAULT_VALUE.WITHOUT_SUGGESTION - _LEGACY_ERROR_TEMP_0058 => UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION - _LEGACY_ERROR_TEMP_0059 => REF_DEFAULT_VALUE_IS_NOT_ALLOWED_IN_PARTITION ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak
srowen commented on PR #41423: URL: https://github.com/apache/spark/pull/41423#issuecomment-1575682910 I'm still not understanding why you analyze this as a leak. Are you saying that these objects are not actually usable and will never be collected because events are dropped? that would be a leak, but then this isn't the fix - it would be to clean up this state even when something is dropped. Is that what you're arguing though? This can't itself be the fix, to just arbitrarily drop state. It's just a hack that happens to avoid this in your setup by potentially reporting incorrect info. That's why I'm also wondering if there are simply ways to make the hot path in this listener faster, to keep up and not drop stuff in the first place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216250498 ## core/src/main/resources/error/error-classes.json: ## @@ -1513,6 +1513,13 @@ "Parse Mode: . To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'." ] }, + "MERGE_CARDINALITY_VIOLATION" : { +"message" : [ + "The ON search condition of the MERGE statement matched a single row from the target table with multiple rows of the source table.", + "This could result in the target row being operated on more than once with an update or delete operation and is not allowed." +], +"sqlState" : "23000" Review Comment: I used `23` class as it is constraint violation but wasn't sure about the subclass. It is not defined in the SQL standard so I used `000`, meaning no subclass. I am not sure how Spark assigns subclasses in these cases. Here is an example of this error in SAP docs: https://dcx.sap.com/sqla170/en/html/80ca9fd06ce21014bc30ac05c444ee4d.html Here is the original JIRA for this error in Hive: https://issues.apache.org/jira/browse/HIVE-14949 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you opened a new pull request, #41454: [SPARK-43376][SQL][FOLLOWUP] lazy construct subquery to improve reuse subquery
ulysses-you opened a new pull request, #41454: URL: https://github.com/apache/spark/pull/41454 ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/41046 make `ReuseAdaptiveSubquery` become not idempotent. This pr reverts the change in `ReuseAdaptiveSubquery`. To solve the same instance issue when planning and reusing subquery in AQE, this pr makes `subqueryMap` hold a func of building subquery. Then in `PlanAdaptiveSubqueries`, each logical subquery plan can build their own instance of physical subquery plan. ### Why are the changes needed? To improve reuse subquery in AQE. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Pass CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #41446: [SPARK-43956][SQL][3.3] Fix the bug doesn't display column's sql for Percentile[Cont|Disc]
beliefer commented on PR #41446: URL: https://github.com/apache/spark/pull/41446#issuecomment-1575508970 @dongjoon-hyun @MaxGekk Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #41445: [SPARK-43956][SQL][3.4] Fix the bug doesn't display column's sql for Percentile[Cont|Disc]
beliefer commented on PR #41445: URL: https://github.com/apache/spark/pull/41445#issuecomment-1575508866 @MaxGekk Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark-connect-go] HyukjinKwon closed pull request #8: [SPARK-43958] Adding support for Channel Builder
HyukjinKwon closed pull request #8: [SPARK-43958] Adding support for Channel Builder URL: https://github.com/apache/spark-connect-go/pull/8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark-connect-go] HyukjinKwon commented on pull request #8: [SPARK-43958] Adding support for Channel Builder
HyukjinKwon commented on PR #8: URL: https://github.com/apache/spark-connect-go/pull/8#issuecomment-1575537525 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zzzzming95 commented on a diff in pull request #41370: [SPARK-43866] Partition filter condition should pushed down to metastore query if it is equivalence Predicate
ming95 commented on code in PR #41370: URL: https://github.com/apache/spark/pull/41370#discussion_r1216785699 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala: ## @@ -1041,6 +1053,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { ExtractableLiteral(value), ExtractAttribute(SupportedAttribute(name))) => Some(s"$value ${op.symbol} $name") + case EqualTo(ExtractStringAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => +Some(s"$name = $value") + Review Comment: This problem seems to occur only when a database such as mysql is used as a metastore database. spark's unit test seems to use derby (in `PruneHiveTablePartitionsSuite` can not occur this case), which cannot reproduce this problem. So it is difficult to implement end-to-end unit test on spark side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #41442: [SPARK-43955][BUILD] Upgrade `scalafmt` from 3.7.3 to 3.7.4
srowen commented on PR #41442: URL: https://github.com/apache/spark/pull/41442#issuecomment-1575681053 Merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #41442: [SPARK-43955][BUILD] Upgrade `scalafmt` from 3.7.3 to 3.7.4
srowen closed pull request #41442: [SPARK-43955][BUILD] Upgrade `scalafmt` from 3.7.3 to 3.7.4 URL: https://github.com/apache/spark/pull/41442 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak
srowen commented on PR #41423: URL: https://github.com/apache/spark/pull/41423#issuecomment-1575695964 I get it, but this trades one incorrectness for another. I don't know of another good way here. Is this resolvable with simply more resources? more cores, mem? like is part of the problem GC thrahsing or swap? if so I think I'd categorize this as just a symptom of that problem, and really the answer is more resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #41444: [WIP][SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
beliefer commented on code in PR #41444: URL: https://github.com/apache/spark/pull/41444#discussion_r1217334172 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala: ## @@ -812,6 +812,69 @@ object functions { */ def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord) + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column): Column = Column.fn("percentile", e, percentage) + + /** + * Aggregate function: returns the exact percentile(s) of numeric column `expr` at the given + * percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile(e: Column, percentage: Column, frequency: Column): Column = +Column.fn("percentile", e, percentage, frequency) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column): Column = +Column.fn("percentile_cont", e, percentage) + + /** + * Aggregate function: returns a percentile value based on a continuous distribution of numeric + * or ANSI interval column at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column = +Column.fn("percentile_cont", e, percentage, lit(reverse)) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column): Column = +Column.fn("percentile_disc", e, percentage) + + /** + * Aggregate function: returns the percentile(s) based on a discrete distribution of numeric + * column `expr` at the given percentage(s) with value range in [0.0, 1.0]. + * + * Note: reverse used to specify whether to reverse calculate percentile value. + * + * @group agg_funcs + * @since 3.5.0 + */ + def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column = Review Comment: I know that. But there are exists many same case. such as: https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L473 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L489 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L506 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L621 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L633 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L686 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L702 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1093 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1164 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1179 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2745 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2759 https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L3498
[GitHub] [spark] yaooqinn commented on pull request #41181: [SPARK-43504][K8S] Mounts the hadoop config map on the executor pod
yaooqinn commented on PR #41181: URL: https://github.com/apache/spark/pull/41181#issuecomment-1575994314 thanks, @dongjoon-hyun and @turboFei. Late +1 from my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] degant commented on a diff in pull request #41428: [SPARK-41958][CORE][3.3] Disallow arbitrary custom classpath with proxy user in cluster mode
degant commented on code in PR #41428: URL: https://github.com/apache/spark/pull/41428#discussion_r1217444147 ## docs/core-migration-guide.md: ## @@ -25,6 +25,7 @@ license: | ## Upgrading from Core 3.2 to 3.3 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. +- Since Spark 3.3.3, `spark.submit.proxyUser.allowCustomClasspathInClusterMode` allows users to disable custom class path in cluster mode by proxy users. It still defaults to `true` to maintain backward compatibility. Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
beliefer commented on PR #41444: URL: https://github.com/apache/spark/pull/41444#issuecomment-1576020831 @zhengruifeng The two functions used with SQL syntax like `percentile_cont(0.5) WITHIN GROUP (ORDER BY v)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on pull request #41455: [SPARK-43962][SQL] Improve error messages: `CANNOT_DECODE_URL`, `CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE`, `CANNOT_PARSE_DECIMAL`, `CANNOT_READ_F
itholic commented on PR #41455: URL: https://github.com/apache/spark/pull/41455#issuecomment-1576034146 cc @MaxGekk @srielau @cloud-fan Please review this when you find some time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217472430 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -397,7 +397,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } val newQuery = try { TableOutputResolver.resolveOutputColumns( -tblName, expectedColumns, query, byName = isByName, conf, supportColDefaultValue = true) +tblName, expectedColumns, query, byName = isByName || insert.byName, conf, Review Comment: let's rename the `isByName` variable to `hasColumnList` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vinodkc commented on a diff in pull request #41144: [SPARK-43470][CORE] Add OS, Java, Python version information to application log
vinodkc commented on code in PR #41144: URL: https://github.com/apache/spark/pull/41144#discussion_r1217472619 ## core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala: ## @@ -106,6 +106,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( protected val envVars: java.util.Map[String, String] = funcs.head.funcs.head.envVars protected val pythonExec: String = funcs.head.funcs.head.pythonExec protected val pythonVer: String = funcs.head.funcs.head.pythonVer + logInfo(s"Python version info: $pythonExec($pythonVer)") Review Comment: Updated code to print version and package details only once per executor Added a bit more details to log list of installed packages too. A new property `spark.executor.python.worker.log.details` has been added to enable this Python info logging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic opened a new pull request, #41455: [SPARK-43962][SQL] Improve error messages: `CANNOT_DECODE_URL`, `CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE`, `CANNOT_PARSE_DECIMAL`, `CANNOT_READ_
itholic opened a new pull request, #41455: URL: https://github.com/apache/spark/pull/41455 ### What changes were proposed in this pull request? This PR proposes to improve error messages for `CANNOT_DECODE_URL`, `CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE`, `CANNOT_PARSE_DECIMAL`, `CANNOT_READ_FILE_FOOTER`, `CANNOT_RECOGNIZE_HIVE_TYPE`. **NOTE:** This PR is an experimental work that utilizes LLM to enhance error messages. The script was created using the `openai` Python library from OpenAI, and minimal review was conducted by author after executing the script. ### Why are the changes needed? For improving errors to make them more actionable and usable. ### Does this PR introduce _any_ user-facing change? No API changes, only error message improvement. ### How was this patch tested? The existing CI should pass. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217471748 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), -_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), +_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta, options)) -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), +_, _, _, _, _, _) => i.copy(table = DDLUtils.readHiveTable(tableMeta)) case UnresolvedCatalogRelation(tableMeta, options, false) -if DDLUtils.isDatasourceTable(tableMeta) => + if DDLUtils.isDatasourceTable(tableMeta) => Review Comment: ```suggestion if DDLUtils.isDatasourceTable(tableMeta) => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217472203 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -397,7 +397,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } val newQuery = try { TableOutputResolver.resolveOutputColumns( -tblName, expectedColumns, query, byName = isByName, conf, supportColDefaultValue = true) +tblName, expectedColumns, query, byName = isByName || insert.byName, conf, Review Comment: let's rename the `isByName` variable to `hasColumnList` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on pull request #41136: [SPARK-43356][K8S] Migrate deprecated createOrReplace to serverSideApply
pan3793 commented on PR #41136: URL: https://github.com/apache/spark/pull/41136#issuecomment-1576003455 rebased on the latest master branch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217470812 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -151,8 +151,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name)) -case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, _), -parts, _, query, overwrite, false) if parts.isEmpty => +case InsertIntoStatement(l@LogicalRelation(_: InsertableRelation, _, _, _), +parts, _, query, overwrite, false, _) if parts.isEmpty => Review Comment: ```suggestion parts, _, query, overwrite, false, _) if parts.isEmpty => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217471514 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), -_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), +_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta, options)) -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), Review Comment: ```suggestion case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), ``` ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), -_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), +_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta, options)) -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), +_, _, _, _, _, _) => Review Comment: ```suggestion _, _, _, _, _, _) => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217471285 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), -_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), Review Comment: ```suggestion case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), ``` ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { -case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), -_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => +case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), +_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => Review Comment: ```suggestion _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #41370: [SPARK-43866] Partition filter condition should pushed down to metastore query if it is equivalence Predicate
cloud-fan commented on code in PR #41370: URL: https://github.com/apache/spark/pull/41370#discussion_r1217481906 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala: ## @@ -994,6 +994,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } } +object ExtractStringAttribute { + @scala.annotation.tailrec + def unapply(expr: Expression): Option[Attribute] = { +expr match { + case attr: Attribute => Some(attr) + case Cast(child, dt: IntegralType, _, _) +if child.dataType.isInstanceOf[StringType] => unapply(child) Review Comment: This assumes Spark and Hive have the same behavior for implicit cast. Is it always true? For example, cast string to int may fail in Spark with ansi mode on, does hive have the same behavior? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #41407: [SPARK-43900][SQL] Support optimize skewed partitions even if introduce extra shuffle
ulysses-you commented on code in PR #41407: URL: https://github.com/apache/spark/pull/41407#discussion_r1217406788 ## sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala: ## @@ -104,7 +104,10 @@ case class AdaptiveSparkPlanExec( @transient private val costEvaluator = conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match { case Some(className) => CostEvaluator.instantiate(className, session.sparkContext.getConf) - case _ => SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)) + case _ => +val forceOptimizeSkewed = conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN) || + conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_IN_REBALANCE_PARTITIONS) Review Comment: it seems fragile, what happens if user enable one and disable the other ? the result behavior is uncontrolled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #41428: [SPARK-41958][CORE][3.3] Disallow arbitrary custom classpath with proxy user in cluster mode
pan3793 commented on code in PR #41428: URL: https://github.com/apache/spark/pull/41428#discussion_r1217437344 ## docs/core-migration-guide.md: ## @@ -25,6 +25,7 @@ license: | ## Upgrading from Core 3.2 to 3.3 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. +- Since Spark 3.3.3, `spark.submit.proxyUser.allowCustomClasspathInClusterMode` allows users to disable custom class path in cluster mode by proxy users. It still defaults to `true` to maintain backward compatibility. Review Comment: nit: an empty line between items -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on PR #41448: URL: https://github.com/apache/spark/pull/41448#issuecomment-1576080545 The test failures don't seem related. I'll need to take a closer look at what happened in `sql - other tests`, though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
Hisoka-X commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217529014 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -397,7 +397,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } val newQuery = try { TableOutputResolver.resolveOutputColumns( -tblName, expectedColumns, query, byName = isByName, conf, supportColDefaultValue = true) +tblName, expectedColumns, query, byName = isByName || insert.byName, conf, Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonwang-db commented on a diff in pull request #41316: [SPARK-43798][SQL][PYTHON] Support Python user-defined table functions
allisonwang-db commented on code in PR #41316: URL: https://github.com/apache/spark/pull/41316#discussion_r1217498437 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala: ## @@ -171,6 +186,18 @@ case class ArrowEvalPython( copy(child = newChild) } +/** + * A logical plan that evaluates a [[PythonUDTF]]. + */ +case class BatchEvalPythonUDTF( Review Comment: Yes batch eval is the baseline impl and we should enable arrow by default for UDTFs. I am going to work on it next given we are supporting complex types for arrow based UDFs. Created a follow-up ticket: https://issues.apache.org/jira/browse/SPARK-43964. > It's not worthwhile refactoring the batch eval code path @HyukjinKwon that's a good point. Should I consider duplicating the code instead of refactoring it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41449: [SPARK-43959][SQL] Make RowLevelOperationSuiteBase and AlignAssignmentsSuite abstract
aokolnychyi commented on code in PR #41449: URL: https://github.com/apache/spark/pull/41449#discussion_r1217400261 ## sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuite.scala: ## @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType} -class AlignAssignmentsSuite extends AnalysisTest { +abstract class AlignAssignmentsSuite extends AnalysisTest { Review Comment: Agree, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vinodkc commented on pull request #41144: [SPARK-43470][CORE] Add OS, Java, Python version information to application log
vinodkc commented on PR #41144: URL: https://github.com/apache/spark/pull/41144#issuecomment-1576034499 > For the Python, the information is printed at every task execution. Could you find a proper place to print that info once, @vinodkc ? > > ``` > 23/06/03 18:29:46 INFO PythonRunner: Python version info: python3(3.11) > 23/06/03 18:29:46 INFO PythonRunner: Python version info: python3(3.11) > 23/06/03 18:29:46 INFO PythonRunner: Times: total = 38, boot = -8568, init = 8606, finish = 0 > 23/06/03 18:29:46 INFO PythonRunner: Times: total = 38, boot = -8571, init = 8609, finish = 0 > 23/06/03 18:29:46 INFO Executor: Finished task 0.0 in stage 18.0 (TID 26). 1393 bytes result sent to driver > 23/06/03 18:29:46 INFO Executor: Finished task 1.0 in stage 18.0 (TID 27). 1396 bytes result sent to driver > 23/06/03 18:29:50 INFO CoarseGrainedExecutorBackend: Got assigned task 28 > 23/06/03 18:29:50 INFO CoarseGrainedExecutorBackend: Got assigned task 29 > 23/06/03 18:29:50 INFO Executor: Running task 0.0 in stage 19.0 (TID 28) > 23/06/03 18:29:50 INFO Executor: Running task 1.0 in stage 19.0 (TID 29) > 23/06/03 18:29:50 INFO TorrentBroadcast: Started reading broadcast variable 14 with 1 pieces (estimated total size 4.0 MiB) > 23/06/03 18:29:50 INFO MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 3.7 KiB, free 366.2 MiB) > 23/06/03 18:29:50 INFO TorrentBroadcast: Reading broadcast variable 14 took 20 ms > 23/06/03 18:29:50 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 5.7 KiB, free 366.2 MiB) > 23/06/03 18:29:50 INFO PythonRunner: Python version info: python3(3.11) > 23/06/03 18:29:50 INFO PythonRunner: Python version info: python3(3.11) > ``` Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217473051 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, _, query, _, _) => + case InsertIntoStatement(l@LogicalRelation(relation, _, _, _), partition, + _, query, _, _, _) => Review Comment: ```suggestion _, query, _, _, _) => ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217472863 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -425,7 +426,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { -case i @ InsertIntoStatement(table, _, _, query, _, _) if table.resolved && query.resolved => +case i@InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved => Review Comment: ```suggestion case i @ InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved => ``` ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, _, query, _, _) => + case InsertIntoStatement(l@LogicalRelation(relation, _, _, _), partition, Review Comment: ```suggestion case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217473411 ## sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala: ## @@ -46,21 +47,24 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { } protected def processInsert( - tableName: String, - input: DataFrame, - cols: Seq[String] = Nil, - partitionExprs: Seq[String] = Nil, - overwrite: Boolean): Unit = { + tableName: String, Review Comment: nit: please keep the previous indentation which is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonwang-db commented on a diff in pull request #41316: [SPARK-43798][SQL][PYTHON] Support Python user-defined table functions
allisonwang-db commented on code in PR #41316: URL: https://github.com/apache/spark/pull/41316#discussion_r1217498437 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala: ## @@ -171,6 +186,18 @@ case class ArrowEvalPython( copy(child = newChild) } +/** + * A logical plan that evaluates a [[PythonUDTF]]. + */ +case class BatchEvalPythonUDTF( Review Comment: Yes batch eval is the baseline impl and we should enable arrow by default for UDTFs. I am going to work on it next given we are supporting complex types for arrow based UDFs. > It's not worthwhile refactoring the batch eval code path @HyukjinKwon that's a good point. Should I consider duplicating the code instead of refactoring it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API
zhengruifeng commented on PR #41444: URL: https://github.com/apache/spark/pull/41444#issuecomment-1575995156 Where are `percentile_cont` and `percentile_disc` from? I can not find them in https://spark.apache.org/docs/latest/api/sql/index.html and `FunctionRegistry` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #41425: [SPARK-43919][SQL] Extract JSON functionality out of Row
amaliujia commented on PR #41425: URL: https://github.com/apache/spark/pull/41425#issuecomment-1576068930 @cloud-fan done. wait CI to pass again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217474065 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala: ## @@ -145,7 +145,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { // handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its // children, hence not matched directly by previous HiveTableRelation case. -case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _) +case i@InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _) Review Comment: ```suggestion case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _) ``` ## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala: ## @@ -226,12 +226,12 @@ case class RelationConversions( plan resolveOperators { // Write path case InsertIntoStatement( - r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists) + r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName) Review Comment: ```suggestion r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonwang-db commented on a diff in pull request #41316: [SPARK-43798][SQL][PYTHON] Support Python user-defined table functions
allisonwang-db commented on code in PR #41316: URL: https://github.com/apache/spark/pull/41316#discussion_r1217487382 ## python/pyspark/sql/functions.py: ## @@ -10403,6 +10405,82 @@ def udf( return _create_py_udf(f=f, returnType=returnType, useArrow=useArrow) +def udtf( +f: Optional[Type] = None, +returnType: Union[StructType, str] = None, Review Comment: I think it's more readable if we require this to be invoked like `@udtf(returnType="...")`, and it doesn't make sense for UDTFs to have a default value for the return type (unlike udfs which use `StringType`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #41332: [SPARK-43801][SQL] Support unwrap date type to string type in UnwrapCastInBinaryComparison
cloud-fan commented on PR #41332: URL: https://github.com/apache/spark/pull/41332#issuecomment-1575962729 My suggestion for this problem is not to abuse the string type. If the column holds timestamp values, it should be timestamp type. If you know that your string-type "timestamp" type always contains standard timestamp string, you can also manually rewrite your predicate to just compare strings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #41449: [SPARK-43959][SQL] Make RowLevelOperationSuiteBase and AlignAssignmentsSuite abstract
cloud-fan commented on code in PR #41449: URL: https://github.com/apache/spark/pull/41449#discussion_r1217394475 ## sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuite.scala: ## @@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType} -class AlignAssignmentsSuite extends AnalysisTest { +abstract class AlignAssignmentsSuite extends AnalysisTest { Review Comment: nit: if this is a base suite, probably add a `Base` suffix in the name, to be consistent with `RowLevelOperationSuiteBase` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #41425: [SPARK-43919][SQL] Extract JSON functionality out of Row
cloud-fan commented on PR #41425: URL: https://github.com/apache/spark/pull/41425#issuecomment-1575992736 @amaliujia can you fix conflicts? I think this PR is ready to go -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217470181 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala: ## @@ -165,19 +165,25 @@ case class QualifiedColType( * would have Map('a' -> Some('1'), 'b' -> None). * @param ifPartitionNotExists If true, only write if the partition does not exist. * Only valid for static partitions. + * @param byName If true, before the data is written, it will be sorted according to Review Comment: ```suggestion * @param byName If true, reorder the data columns to match the column names of the target table. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement
cloud-fan commented on code in PR #40908: URL: https://github.com/apache/spark/pull/40908#discussion_r1217470419 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -151,8 +151,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name)) -case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, _), -parts, _, query, overwrite, false) if parts.isEmpty => +case InsertIntoStatement(l@LogicalRelation(_: InsertableRelation, _, _, _), Review Comment: ```suggestion case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, _), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216253601 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala: ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, Unevaluable} +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.types.DataType + +case class MergeRows( +isSourceRowPresent: Expression, +isTargetRowPresent: Expression, +matchedInstructions: Seq[Instruction], +notMatchedInstructions: Seq[Instruction], +notMatchedBySourceInstructions: Seq[Instruction], +checkCardinality: Boolean, +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + + override lazy val producedAttributes: AttributeSet = { +AttributeSet(output.filterNot(attr => inputSet.contains(attr))) + } + + override lazy val references: AttributeSet = child.outputSet + + override def simpleString(maxFields: Int): String = { +s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}" + } + + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = { +copy(child = newChild) + } +} + +object MergeRows { + final val ROW_ID = "__row_id" + + sealed trait Instruction extends Expression with Unevaluable { Review Comment: Each `MergeAction` gets converted into a particular instance of `Instruction`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources
aokolnychyi commented on code in PR #41448: URL: https://github.com/apache/spark/pull/41448#discussion_r1216255732 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala: ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.roaringbitmap.longlong.Roaring64Bitmap + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.BasePredicate +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.Projection +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate +import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, Keep, ROW_ID, Split} +import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.UnaryExecNode + +case class MergeRowsExec( +isSourceRowPresent: Expression, +isTargetRowPresent: Expression, +matchedInstructions: Seq[Instruction], +notMatchedInstructions: Seq[Instruction], +notMatchedBySourceInstructions: Seq[Instruction], +checkCardinality: Boolean, +output: Seq[Attribute], +child: SparkPlan) extends UnaryExecNode { + + @transient override lazy val producedAttributes: AttributeSet = { +AttributeSet(output.filterNot(attr => inputSet.contains(attr))) + } + + @transient override lazy val references: AttributeSet = child.outputSet + + override def simpleString(maxFields: Int): String = { +s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}" + } + + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = { +copy(child = newChild) + } + + protected override def doExecute(): RDD[InternalRow] = { +child.execute().mapPartitions(processPartition) + } + + private def processPartition(rowIterator: Iterator[InternalRow]): Iterator[InternalRow] = { +val isSourceRowPresentPred = createPredicate(isSourceRowPresent) +val isTargetRowPresentPred = createPredicate(isTargetRowPresent) + +val matchedInstructionExecs = planInstructions(matchedInstructions) +val notMatchedInstructionExecs = planInstructions(notMatchedInstructions) +val notMatchedBySourceInstructionExecs = planInstructions(notMatchedBySourceInstructions) + +val cardinalityValidator = if (checkCardinality) { + val rowIdOrdinal = child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID)) + assert(rowIdOrdinal != -1, "Cannot find row ID attr") + BitmapCardinalityValidator(rowIdOrdinal) +} else { + NoopCardinalityValidator +} + +val mergeIterator = new MergeRowIterator( + rowIterator, cardinalityValidator, isTargetRowPresentPred, isSourceRowPresentPred, + matchedInstructionExecs, notMatchedInstructionExecs, notMatchedBySourceInstructionExecs) + +// null indicates a record must be discarded +mergeIterator.filter(_ != null) + } + + private def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +UnsafeProjection.create(exprs, child.output) + } + + private def createPredicate(expr: Expression): BasePredicate = { +GeneratePredicate.generate(expr, child.output) + } + + private def planInstructions(instructions: Seq[Instruction]): Seq[InstructionExec] = { +instructions.map { + case Keep(cond, output) => +KeepExec(createPredicate(cond), createProjection(output)) + case Split(cond, output, otherOutput) => +SplitExec(createPredicate(cond), createProjection(output), createProjection(otherOutput)) + case other => +throw new AnalysisException(s"Unexpected instruction: $other") +} + } + + sealed trait InstructionExec { +def condition: BasePredicate + } + + case class
[GitHub] [spark] dongjoon-hyun closed pull request #41438: [SPARK-43953][CONNECT] Remove `pass`
dongjoon-hyun closed pull request #41438: [SPARK-43953][CONNECT] Remove `pass` URL: https://github.com/apache/spark/pull/41438 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #41437: [SPARK-43917][PS][INFRA] Upgrade `pandas` to 2.0.2
dongjoon-hyun closed pull request #41437: [SPARK-43917][PS][INFRA] Upgrade `pandas` to 2.0.2 URL: https://github.com/apache/spark/pull/41437 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #41409: [SPARK-43901][SQL] Avro to Support custom decimal type backed by Long
dongjoon-hyun commented on PR #41409: URL: https://github.com/apache/spark/pull/41409#issuecomment-1575421708 Could you resolve the conflict, @siying ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum closed pull request #41419: [SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array
wangyum closed pull request #41419: [SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array URL: https://github.com/apache/spark/pull/41419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #41419: [SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array
wangyum commented on PR #41419: URL: https://github.com/apache/spark/pull/41419#issuecomment-1575436568 Merged to master and branch-3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org