[GitHub] [spark] aminebag commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak

2023-06-04 Thread via GitHub


aminebag commented on PR #41423:
URL: https://github.com/apache/spark/pull/41423#issuecomment-1575691248

   @srowen 
   > Are you saying that these objects are not actually usable and will never 
be collected because events are dropped?
   
   Yes, that's exactly what I'm saying. For example, many LiveTask objects 
(corresponding to tasks that are already finished) can remain in memory forever 
while having no use.
   
   > it would be to clean up this state even when something is dropped
   
   How can we clean the state if the event onTaskEnd is lost ? Is there anyway 
to determine whether a task is finished if we have missed the onTaskEnd event ? 
If there's, this could be the most appropriate way to clean up.
   
   > It's just a hack that happens to avoid this in your setup by potentially 
reporting incorrect info.
   
   Yes, with this fix we would potentially report incorrect info, but it's 
already the case without the fix. When we miss an onTaskEnd event we report 
that the task is still running but actually it's not. The purpose of the fix is 
not to correctly report monitoring information. The purpose of the fix is just 
to contain the leak. It's better to have incorrect monitoring data (again, it's 
already the case) than to make the application unstable.
   
   > That's why I'm also wondering if there are simply ways to make the hot 
path in this listener faster, to keep up and not drop stuff in the first place.
   
   Sure, I think that it's a good idea to make the listener faster and I'm open 
to any ideas to achieve that. Still, no matter how much we make the listener 
faster, we can never guarantee that it'll always be able to keep up with the 
rate of incoming events. I think that it's not sufficient to just improve the 
listener and just hope that it'll never be swamped by events. In my opinion, 
the listener should be able to survive any flood of events without leaving a 
leak behind.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41440: [SPARK-43952][CORE][CONNECT] Add SparkContext APIs for query cancellation by tag

2023-06-04 Thread via GitHub


HyukjinKwon commented on code in PR #41440:
URL: https://github.com/apache/spark/pull/41440#discussion_r1217271474


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -2851,6 +2907,14 @@ object SparkContext extends Logging {
*/
   private[spark] val DRIVER_IDENTIFIER = "driver"
 
+  /** Separator of tags in SPARK_JOB_TAGS property */
+  private[spark] val SPARK_JOB_TAGS_SEP = ","
+
+  private[spark] def throwIfInvalidTagName(tagName: String) = {
+if (tagName.contains(SPARK_JOB_TAGS_SEP)) {
+  throw new IllegalArgumentException("Spark job tag cannot contain 
'$SPARK_JOB_TAG_NAME_SEP'.")

Review Comment:
   ```suggestion
 throw new IllegalArgumentException(s"Spark job tag cannot contain 
'$SPARK_JOB_TAG_NAME_SEP'.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] siying commented on pull request #41409: [SPARK-43901][SQL] Avro to Support custom decimal type backed by Long

2023-06-04 Thread via GitHub


siying commented on PR #41409:
URL: https://github.com/apache/spark/pull/41409#issuecomment-1575861442

   @dongjoon-hyun done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216255732


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.expressions.BasePredicate
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Projection
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
Keep, ROW_ID, Split}
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.UnaryExecNode
+
+case class MergeRowsExec(
+isSourceRowPresent: Expression,
+isTargetRowPresent: Expression,
+matchedInstructions: Seq[Instruction],
+notMatchedInstructions: Seq[Instruction],
+notMatchedBySourceInstructions: Seq[Instruction],
+checkCardinality: Boolean,
+output: Seq[Attribute],
+child: SparkPlan) extends UnaryExecNode {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient override lazy val references: AttributeSet = child.outputSet
+
+  override def simpleString(maxFields: Int): String = {
+s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+copy(child = newChild)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+child.execute().mapPartitions(processPartition)
+  }
+
+  private def processPartition(rowIterator: Iterator[InternalRow]): 
Iterator[InternalRow] = {
+val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+val matchedInstructionExecs = planInstructions(matchedInstructions)
+val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+val notMatchedBySourceInstructionExecs = 
planInstructions(notMatchedBySourceInstructions)
+
+val cardinalityValidator = if (checkCardinality) {
+  val rowIdOrdinal = child.output.indexWhere(attr => 
conf.resolver(attr.name, ROW_ID))
+  assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+  BitmapCardinalityValidator(rowIdOrdinal)
+} else {
+  NoopCardinalityValidator
+}
+
+val mergeIterator = new MergeRowIterator(
+  rowIterator, cardinalityValidator, isTargetRowPresentPred, 
isSourceRowPresentPred,
+  matchedInstructionExecs, notMatchedInstructionExecs, 
notMatchedBySourceInstructionExecs)
+
+// null indicates a record must be discarded
+mergeIterator.filter(_ != null)
+  }
+
+  private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+UnsafeProjection.create(exprs, child.output)
+  }
+
+  private def createPredicate(expr: Expression): BasePredicate = {
+GeneratePredicate.generate(expr, child.output)
+  }
+
+  private def planInstructions(instructions: Seq[Instruction]): 
Seq[InstructionExec] = {
+instructions.map {
+  case Keep(cond, output) =>
+KeepExec(createPredicate(cond), createProjection(output))
+  case Split(cond, output, otherOutput) =>
+SplitExec(createPredicate(cond), createProjection(output), 
createProjection(otherOutput))
+  case other =>
+throw new AnalysisException(s"Unexpected instruction: $other")
+}
+  }
+
+  sealed trait InstructionExec {
+def condition: BasePredicate
+  }
+
+  case class 

[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216253601


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.DataType
+
+case class MergeRows(
+isSourceRowPresent: Expression,
+isTargetRowPresent: Expression,
+matchedInstructions: Seq[Instruction],
+notMatchedInstructions: Seq[Instruction],
+notMatchedBySourceInstructions: Seq[Instruction],
+checkCardinality: Boolean,
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+
+  override lazy val producedAttributes: AttributeSet = {
+AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override lazy val references: AttributeSet = child.outputSet
+
+  override def simpleString(maxFields: Int): String = {
+s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+copy(child = newChild)
+  }
+}
+
+object MergeRows {
+  final val ROW_ID = "__row_id"
+
+  sealed trait Instruction extends Expression with Unevaluable {

Review Comment:
   Each `MergeAction` gets converted into a particular instance of 
`Instruction`. Each such instruction is a valid expression so that they will be 
properly transformed by the analyzer and optimizer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LorenzoMartini commented on pull request #40018: [SPARK-42439][SQL] In v2 writes, make createJobDescription in FileWrite.toBatch not lazy

2023-06-04 Thread via GitHub


LorenzoMartini commented on PR #40018:
URL: https://github.com/apache/spark/pull/40018#issuecomment-1575674518

   Maybe @MaxGekk ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly

2023-06-04 Thread via GitHub


HyukjinKwon commented on PR #41450:
URL: https://github.com/apache/spark/pull/41450#issuecomment-1575790826

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly

2023-06-04 Thread via GitHub


HyukjinKwon closed pull request #41450: [SPARK-43960][PS][TESTS] 
DataFrameConversionTestsMixin is not tested properly
URL: https://github.com/apache/spark/pull/41450


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1217358049


##
sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala:
##
@@ -92,6 +93,67 @@ class JoinSuite extends QueryTest with SharedSparkSession 
with AdaptiveSparkPlan
 operators.head
   }
 
+  test("NO_BROADCAST_AND_REPLICATION hint is respected in cross joins") {

Review Comment:
   These 3 tests cover scenarios when it is not safe to broadcast or replicate 
the target table to perform the cardinality check. The newly added internal 
hint handles this. There are MERGE tests for this too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic opened a new pull request, #41450: [SPARK-43960][PS][TESTS] DataFrameConversionTestsMixin is not tested properly

2023-06-04 Thread via GitHub


itholic opened a new pull request, #41450:
URL: https://github.com/apache/spark/pull/41450

   ### What changes were proposed in this pull request?
   
   This PR proposes to fix test which is not tested properly.
   
   ### Why are the changes needed?
   
   To test properly.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, it's test only
   
   
   ### How was this patch tested?
   
   The existing CI should pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aminebag commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak

2023-06-04 Thread via GitHub


aminebag commented on PR #41423:
URL: https://github.com/apache/spark/pull/41423#issuecomment-1575506059

   @srowen 
   I have applied and tested the modification you have suggested (replace 
toList with toArray) and it had no observable impact on the issue. I can still 
include it in the change If you want.
   
   @sarutak 
   Increasing the value of `spark.scheduler.listenerbus.eventqueue.capacity` 
would reduce the severity of the leak but I don't think that it would entirely 
prevent it from happening, especially at moments of peaks of activity. I 
suppose we would have less event drops by increasing the size of the queue, but 
tasks would still leak and keep adding-up in memory until eventually there's no 
more space for the driver to operate.
   What I have noticed with the heap dumps in both the cases of our application 
in production and the tests, is that the most significant leak is the liveTasks 
map of the ApplicationStatusListener object. As I have illustrated using the 
tests, the leak was contained by limiting the size of the liveTasks map. I 
think that this fix addresses the problem directly, since no matter how many 
events are dropped, the number of leaked tasks will never exceed the value of 
`spark.ui.refreshedTasks`. Thus, the application can live for days and we would 
never worry about liveTasks map taking over all memory. Especially that the 
role of AppStatusListener is just to provide monitoring information about the 
state of the application, and when you start to have event drops, 
AppStatusListener's state is already incoherent.
   This new property `spark.ui.refreshedTasks` is optional and disabled by 
default and we would only need to enable it when we observe that the 
AppStatusListener is leaking.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bjornjorgensen commented on pull request #40420: [SPARK-42617][PS] Support `isocalendar` from the pandas 2.0.0

2023-06-04 Thread via GitHub


bjornjorgensen commented on PR #40420:
URL: https://github.com/apache/spark/pull/40420#issuecomment-1575528446

   @dzhigimont we have upgraded main branch to pandas 2.0.2 now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases

2023-06-04 Thread via GitHub


HyukjinKwon closed pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add 
optional pattern for Catalog.listDatabases
URL: https://github.com/apache/spark/pull/41421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases

2023-06-04 Thread via GitHub


HyukjinKwon commented on PR #41421:
URL: https://github.com/apache/spark/pull/41421#issuecomment-1575788667

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41415: [SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts

2023-06-04 Thread via GitHub


HyukjinKwon commented on code in PR #41415:
URL: https://github.com/apache/spark/pull/41415#discussion_r1217286686


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##
@@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] {
 val canonicalUri =
   
fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri)
 sessionHolder.session.sparkContext.addArchive(canonicalUri.toString)
+  } else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
+sessionHolder.session.sparkContext.addFile(target.toString)

Review Comment:
   For regular files and archives, I don't intend to expose 
`org.apache.spark.SparkFiles` for now.
   Since the files are archives are always stored at the current working 
directory of executors in production, I was simply thinking about creating a 
session dedicated directory, and change the current working directory to that.
   
   Meaning that the end users would continue accessing to their file with 
`./myfile.txt` or `./myarchive`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41415: [SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts

2023-06-04 Thread via GitHub


HyukjinKwon commented on code in PR #41415:
URL: https://github.com/apache/spark/pull/41415#discussion_r1217286896


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##
@@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] {
 val canonicalUri =
   
fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri)
 sessionHolder.session.sparkContext.addArchive(canonicalUri.toString)
+  } else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
+sessionHolder.session.sparkContext.addFile(target.toString)

Review Comment:
   (`SparkFiles` is being used in the test case here but that's a sort of hack 
to make sure of cleaning up, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #41421: [SPARK-43881][SQL][PYTHON][CONNECT] Add optional pattern for Catalog.listDatabases

2023-06-04 Thread via GitHub


beliefer commented on PR #41421:
URL: https://github.com/apache/spark/pull/41421#issuecomment-1575891405

   @HyukjinKwon Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


beliefer commented on code in PR #41444:
URL: https://github.com/apache/spark/pull/41444#discussion_r1217334172


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -812,6 +812,69 @@ object functions {
*/
   def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord)
 
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column): Column = 
Column.fn("percentile", e, percentage)
+
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column, frequency: Column): Column =
+Column.fn("percentile", e, percentage, frequency)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column): Column =
+Column.fn("percentile_cont", e, percentage)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column 
=
+Column.fn("percentile_cont", e, percentage, lit(reverse))
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column): Column =
+Column.fn("percentile_disc", e, percentage)
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column 
=

Review Comment:
   I know that.
   But there are exists many same cases. such as:
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L473
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L489
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L506
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L621
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L633
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L686
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L702
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1093
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1164
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1179
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2745
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2759
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L3498
   

[GitHub] [spark] wForget commented on pull request #41332: [SPARK-43801][SQL] Support unwrap date type to string type in UnwrapCastInBinaryComparison

2023-06-04 Thread via GitHub


wForget commented on PR #41332:
URL: https://github.com/apache/spark/pull/41332#issuecomment-1575952608

   I found the same problem in hive data source, the partition filter cannot be 
pushed down due to the cast expression. Can we push down date type filter in 
`Shim_v0_13#getPartitionsByFilter`? In addition, is it possible to use unquoted 
date string to pass date filter? Hive metastore will parse unquoted date string 
into a date type. Refer to 
https://github.com/apache/hive/blob/7cd3107a76d633ef5fae2ffb8ec16953ac968092/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/parser/Filter.g#L546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #41136: [SPARK-43356][K8S] Migrate deprecated createOrReplace to serverSideApply

2023-06-04 Thread via GitHub


dongjoon-hyun commented on PR #41136:
URL: https://github.com/apache/spark/pull/41136#issuecomment-1575607354

   We are now using `6.7.0`. Could you rebase it to the master branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #41191: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values

2023-06-04 Thread via GitHub


gengliangwang commented on code in PR #41191:
URL: https://github.com/apache/spark/pull/41191#discussion_r1217103535


##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala:
##
@@ -158,30 +158,37 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
 // For CREATE TABLE [AS SELECT], we should use the v1 command if the 
catalog is resolved to the
 // session catalog and the table provider is not v2.
-case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) =>
+case c @ CreateTable(ResolvedV1Identifier(ident), _, _, u: 
UnresolvedTableSpec, _,

Review Comment:
   Nit: shall we have a single rule for resolving the table spec? This rule is 
for resolving session catalogs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #41440: [SPARK-43952][CORE][CONNECT] Add SparkContext APIs for query cancellation by tag

2023-06-04 Thread via GitHub


HyukjinKwon commented on PR #41440:
URL: https://github.com/apache/spark/pull/41440#issuecomment-1575805298

   > If we don't want to add public APIs like that, I'm also fine with having 
all that as private[spark]; my planned use of it is inside Spark in Spark 
Connect.
   
   I am fine with adding them as a public API


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #41191: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values

2023-06-04 Thread via GitHub


gengliangwang commented on code in PR #41191:
URL: https://github.com/apache/spark/pull/41191#discussion_r1217279661


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableSpec.scala:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal}
+import org.apache.spark.sql.catalyst.optimizer.ConstantFolding
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * This object is responsible for processing unresolved table specifications 
in commands with
+ * OPTIONS lists. The parser produces such lists as maps from strings to 
unresolved expressions.
+ * After otherwise resolving such expressions in the analyzer, here we convert 
them to resolved
+ * table specifications wherein these OPTIONS list values are represented as 
strings instead, for
+ * convenience.
+ */
+object ResolveTableSpec {
+  def apply(u: UnresolvedTableSpec, opts: UnresolvedOptionsList): 
ResolvedTableSpec = {
+val newOptions: Seq[(String, String)] = opts.options.map {
+  case (key: String, null) =>
+(key, null)
+  case (key: String, value: Expression) =>
+val newValue: String = try {
+  constantFold(value) match {

Review Comment:
   Shall we just use `value.eval()` since it is foldable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon opened a new pull request, #41452: [DO-NOT-MERGE] Testing revert 1

2023-06-04 Thread via GitHub


HyukjinKwon opened a new pull request, #41452:
URL: https://github.com/apache/spark/pull/41452

   ### What changes were proposed in this pull request?
   
   TBD
   
   ### Why are the changes needed?
   TBD
   
   ### Does this PR introduce _any_ user-facing change?
   TBD
   
   ### How was this patch tested?
   TBD
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon opened a new pull request, #41453: [DO-NOT-MERGE] Testing revert 2

2023-06-04 Thread via GitHub


HyukjinKwon opened a new pull request, #41453:
URL: https://github.com/apache/spark/pull/41453

   ### What changes were proposed in this pull request?
   
   TBD
   
   ### Why are the changes needed?
   TBD
   
   ### Does this PR introduce _any_ user-facing change?
   TBD
   
   ### How was this patch tested?
   TBD
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #41385: [SPARK-43205][SQL][FOLLOWUP] add ExpressionWithUnresolvedIdentifier to simplify code

2023-06-04 Thread via GitHub


gengliangwang commented on code in PR #41385:
URL: https://github.com/apache/spark/pull/41385#discussion_r1217279071


##
sql/core/src/test/resources/sql-tests/analyzer-results/ansi/double-quoted-identifiers-disabled.sql.out:
##
@@ -218,8 +218,8 @@ org.apache.spark.sql.AnalysisException
 "objectType" : "",
 "objectName" : "",
 "startIndex" : 15,
-"stopIndex" : 37,
-"fragment" : "not_exist AS X(`hello`)"
+"stopIndex" : 23,

Review Comment:
   Awesome, the context is accurate now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a diff in pull request #41370: [SPARK-43866] Partition filter condition should pushed down to metastore query if it is equivalence Predicate

2023-06-04 Thread via GitHub


wangyum commented on code in PR #41370:
URL: https://github.com/apache/spark/pull/41370#discussion_r1217318011


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala:
##
@@ -1041,6 +1053,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   ExtractableLiteral(value), 
ExtractAttribute(SupportedAttribute(name))) =>
 Some(s"$value ${op.symbol} $name")
 
+  case EqualTo(ExtractStringAttribute(SupportedAttribute(name)), 
ExtractableLiteral(value)) =>
+Some(s"$name = $value")
+

Review Comment:
   A test like this:
   ```scala
 test("SPARK-43866:xxx") {
   withTable("test_tb") {
 sql("CREATE TABLE test_tb (id int) PARTITIONED BY (dt string)")
 sql("insert into test_tb partition(dt = '20230505.0') select 1")
 val df = sql("select * from test_tb where dt=20230505")
 checkAnswer(df, Row(1, "20230505.0"))
   }
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216251279


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, 
AttributeReference, Expression, IsNotNull, MetadataAttribute, 
MonotonicallyIncreasingID, OuterReference, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType, 
LeftAnti, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, 
Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction, 
MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project, UpdateAction, 
WriteDelta}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
Keep, ROW_ID, Split}
+import org.apache.spark.sql.catalyst.util.RowDeltaUtils.OPERATION_COLUMN
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.connector.write.{RowLevelOperationTable, 
SupportsDelta}
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * A rule that rewrites MERGE operations using plans that operate on 
individual or groups of rows.
+ *
+ * This rule assumes the commands have been fully resolved and all assignments 
have been aligned.
+ */
+object RewriteMergeIntoTable extends RewriteRowLevelCommand with 
PredicateHelper {
+
+  private final val ROW_FROM_SOURCE = "__row_from_source"
+  private final val ROW_FROM_TARGET = "__row_from_target"
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
+notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned 
&&
+matchedActions.isEmpty && notMatchedActions.size == 1 &&
+notMatchedBySourceActions.isEmpty =>
+
+  EliminateSubqueryAliases(aliasedTable) match {
+case r: DataSourceV2Relation =>
+  // NOT MATCHED conditions may only refer to columns in source so 
they can be pushed down
+  val insertAction = notMatchedActions.head.asInstanceOf[InsertAction]
+  val filteredSource = insertAction.condition match {
+case Some(insertCond) => Filter(insertCond, source)
+case None => source
+  }
+
+  // there is only one NOT MATCHED action, use a left anti join to 
remove any matching rows
+  // and switch to using a regular append instead of a row-level MERGE 
operation
+  // only unmatched source rows that match the condition are appended 
to the table
+  val joinPlan = Join(filteredSource, r, LeftAnti, Some(cond), 
JoinHint.NONE)
+
+  val output = insertAction.assignments.map(_.value)
+  val outputColNames = r.output.map(_.name)
+  val projectList = output.zip(outputColNames).map { case (expr, name) 
=>
+Alias(expr, name)()
+  }
+  val project = Project(projectList, joinPlan)
+
+  AppendData.byPosition(r, project)
+
+case _ =>
+  m
+  }
+
+case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,

Review Comment:
   This is a special case when there are only NOT MATCHED actions (having just 
1 such action is handled above).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216251279


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, 
AttributeReference, Expression, IsNotNull, MetadataAttribute, 
MonotonicallyIncreasingID, OuterReference, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType, 
LeftAnti, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, 
Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction, 
MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project, UpdateAction, 
WriteDelta}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
Keep, ROW_ID, Split}
+import org.apache.spark.sql.catalyst.util.RowDeltaUtils.OPERATION_COLUMN
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.connector.write.{RowLevelOperationTable, 
SupportsDelta}
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ * A rule that rewrites MERGE operations using plans that operate on 
individual or groups of rows.
+ *
+ * This rule assumes the commands have been fully resolved and all assignments 
have been aligned.
+ */
+object RewriteMergeIntoTable extends RewriteRowLevelCommand with 
PredicateHelper {
+
+  private final val ROW_FROM_SOURCE = "__row_from_source"
+  private final val ROW_FROM_TARGET = "__row_from_target"
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
+notMatchedBySourceActions) if m.resolved && m.rewritable && m.aligned 
&&
+matchedActions.isEmpty && notMatchedActions.size == 1 &&
+notMatchedBySourceActions.isEmpty =>
+
+  EliminateSubqueryAliases(aliasedTable) match {
+case r: DataSourceV2Relation =>
+  // NOT MATCHED conditions may only refer to columns in source so 
they can be pushed down
+  val insertAction = notMatchedActions.head.asInstanceOf[InsertAction]
+  val filteredSource = insertAction.condition match {
+case Some(insertCond) => Filter(insertCond, source)
+case None => source
+  }
+
+  // there is only one NOT MATCHED action, use a left anti join to 
remove any matching rows
+  // and switch to using a regular append instead of a row-level MERGE 
operation
+  // only unmatched source rows that match the condition are appended 
to the table
+  val joinPlan = Join(filteredSource, r, LeftAnti, Some(cond), 
JoinHint.NONE)
+
+  val output = insertAction.assignments.map(_.value)
+  val outputColNames = r.output.map(_.name)
+  val projectList = output.zip(outputColNames).map { case (expr, name) 
=>
+Alias(expr, name)()
+  }
+  val project = Project(projectList, joinPlan)
+
+  AppendData.byPosition(r, project)
+
+case _ =>
+  m
+  }
+
+case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,

Review Comment:
   This is a special case when there are only NOT MATCHED actions (just 1 is 
handled above).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [spark] aminebag commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak

2023-06-04 Thread via GitHub


aminebag commented on PR #41423:
URL: https://github.com/apache/spark/pull/41423#issuecomment-1575726835

   I don't think the answer is more resources. If we had more memory or CPU 
power we would just delay the issue we wouldn't prevent it. Also, if we had 
more CPU power the listener would become faster at consuming events, but the 
producer as well could become faster at producing them.
   
   Besides, increasing resources for the driver is vertical scaling which, I 
believe, is against the spirit of Spark and distributed computing.
   
   One solution would be to make the events queue blocking and never drop 
events (block until there's space in the queue to put the event). But this 
would mean that the scheduler (the event producer) would be impacted by any 
slowness of the listener. I don't think that it's a good idea to have a 
secondary feature (application monitoriong) impact the performance of a main 
feature (task scheduling).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41444: [WIP][SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


HyukjinKwon commented on code in PR #41444:
URL: https://github.com/apache/spark/pull/41444#discussion_r1217278535


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -812,6 +812,69 @@ object functions {
*/
   def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord)
 
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column): Column = 
Column.fn("percentile", e, percentage)
+
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column, frequency: Column): Column =
+Column.fn("percentile", e, percentage, frequency)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column): Column =
+Column.fn("percentile_cont", e, percentage)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column 
=
+Column.fn("percentile_cont", e, percentage, lit(reverse))
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column): Column =
+Column.fn("percentile_disc", e, percentage)
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column 
=

Review Comment:
   Can we have them as all with `Column` signature? See also the comments on 
the top of this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #41385: [SPARK-43205][SQL][FOLLOWUP] add ExpressionWithUnresolvedIdentifier to simplify code

2023-06-04 Thread via GitHub


gengliangwang commented on code in PR #41385:
URL: https://github.com/apache/spark/pull/41385#discussion_r1217278997


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala:
##
@@ -18,39 +18,51 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER
 import org.apache.spark.sql.types.StringType
 
 /**
- * Resolves the catalog of the name parts for table/view/function/namespace.
+ * Resolves the identifier expressions and builds the original 
plans/expressions.
  */
-object IdentifierClauseUtil {
-  private def getNotNullFoldableString(clauseName: String, expr: Expression): 
String = {
+object ResolveIdentifierClause extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsAnyPattern(UNRESOLVED_IDENTIFIER)) {
+case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved =>
+  p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr))
+case other =>
+  
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER))
 {
+case e: ExpressionWithUnresolvedIdentifier if 
e.identifierExpr.resolved =>
+  e.exprBuilder.apply(evalIdentifierExpr(e.identifierExpr))
+  }
+  }
+
+  private def evalIdentifierExpr(expr: Expression): Seq[String] = {
 expr match {
   case e if !e.foldable => expr.failAnalysis(
 errorClass = "NOT_A_CONSTANT_STRING.NOT_CONSTANT",
 messageParameters = Map(
-  "name" -> clauseName,
+  "name" -> "IDENTIFIER",
   "expr" -> expr.sql))
   case e if e.dataType != StringType => expr.failAnalysis(
 errorClass = "NOT_A_CONSTANT_STRING.WRONG_TYPE",
 messageParameters = Map(
-  "name" -> clauseName,
+  "name" -> "IDENTIFIER",
   "expr" -> expr.sql,
   "dataType" -> e.dataType.catalogString))
   case e =>
 e.eval() match {
   case null => expr.failAnalysis(
 errorClass = "NOT_A_CONSTANT_STRING.NULL",
 messageParameters = Map(
-  "name" -> clauseName,
+  "name" -> "IDENTIFIER",
   "expr" -> expr.sql))
-  case other => other.toString // OK
+  case other =>
+// Parse the identifier string to name parts.
+UnresolvedAttribute(other.toString).nameParts

Review Comment:
   This one is a bit confusing. Shall we just use 
   ```
   CatalystSqlParser.parseMultipartIdentifier(name)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41415: [SPARK-43906][PYTHON][CONNECT] Implement the file support in SparkSession.addArtifacts

2023-06-04 Thread via GitHub


HyukjinKwon commented on code in PR #41415:
URL: https://github.com/apache/spark/pull/41415#discussion_r1217286686


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala:
##
@@ -154,6 +154,8 @@ class SparkConnectArtifactManager private[connect] {
 val canonicalUri =
   
fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri)
 sessionHolder.session.sparkContext.addArchive(canonicalUri.toString)
+  } else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
+sessionHolder.session.sparkContext.addFile(target.toString)

Review Comment:
   For regular files and archives, I don't intend to expose 
`org.apache.spark.SparkFiles` for now.
   Since the files are archives are always stored at the current working 
directory of executors in production, I was simply thinking about creating a 
session dedicated directory, and change the current working directory to that 
(during Python UDF execution).
   
   Meaning that the end users would continue accessing to their file with 
`./myfile.txt` or `./myarchive`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


HyukjinKwon commented on code in PR #41444:
URL: https://github.com/apache/spark/pull/41444#discussion_r1217343932


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -812,6 +812,69 @@ object functions {
*/
   def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord)
 
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column): Column = 
Column.fn("percentile", e, percentage)
+
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column, frequency: Column): Column =
+Column.fn("percentile", e, percentage, frequency)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column): Column =
+Column.fn("percentile_cont", e, percentage)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column 
=
+Column.fn("percentile_cont", e, percentage, lit(reverse))
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column): Column =
+Column.fn("percentile_disc", e, percentage)
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column 
=

Review Comment:
   They are mostly there because of the legacy reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #40610: [SPARK-42626][CONNECT] Add Destructive Iterator for SparkResult

2023-06-04 Thread via GitHub


ivoson commented on code in PR #40610:
URL: https://github.com/apache/spark/pull/40610#discussion_r1216559940


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkResult.scala:
##
@@ -46,7 +46,14 @@ private[sql] class SparkResult[T](
   private[this] var numRecords: Int = 0
   private[this] var structType: StructType = _
   private[this] var boundEncoder: ExpressionEncoder[T] = _
-  private[this] val batches = mutable.Buffer.empty[ColumnarBatch]
+  private[this] var nextBatchIndex: Int = 0
+  private[this] val idxToBatches = mutable.Map.empty[Int, ColumnarBatch]
+
+  // Exposed for UT.
+  private[sql] def existingBatches(): Seq[ColumnarBatch] = {

Review Comment:
   thanks, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #41424: [SPARK-43913][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2426-2432]

2023-06-04 Thread via GitHub


beliefer commented on code in PR #41424:
URL: https://github.com/apache/spark/pull/41424#discussion_r1216524958


##
core/src/main/resources/error/error-classes.json:
##
@@ -1834,6 +1849,11 @@
 ],
 "sqlState" : "42K05"
   },
+  "RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT" : {
+"message" : [
+  ""

Review Comment:
   Got it. But it seems not about `UNRESOLVED_COLUMN`. How about 
`MISSING_ATTRIBUTES` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on pull request #41451: [SPARK-43948][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[0050|0057|0058|0059]

2023-06-04 Thread via GitHub


panbingkun commented on PR #41451:
URL: https://github.com/apache/spark/pull/41451#issuecomment-1575582851

   cc @MaxGekk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


beliefer commented on code in PR #41444:
URL: https://github.com/apache/spark/pull/41444#discussion_r1217345052


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -812,6 +812,69 @@ object functions {
*/
   def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord)
 
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column): Column = 
Column.fn("percentile", e, percentage)
+
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column, frequency: Column): Column =
+Column.fn("percentile", e, percentage, frequency)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column): Column =
+Column.fn("percentile_cont", e, percentage)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column 
=
+Column.fn("percentile_cont", e, percentage, lit(reverse))
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column): Column =
+Column.fn("percentile_disc", e, percentage)
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column 
=

Review Comment:
   Got it. Let me update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #41424: [SPARK-43913][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2426-2432]

2023-06-04 Thread via GitHub


beliefer commented on code in PR #41424:
URL: https://github.com/apache/spark/pull/41424#discussion_r1216524958


##
core/src/main/resources/error/error-classes.json:
##
@@ -1834,6 +1849,11 @@
 ],
 "sqlState" : "42K05"
   },
+  "RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT" : {
+"message" : [
+  ""

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #41191: [SPARK-43529][SQL] Support general constant expressions as CREATE/REPLACE TABLE OPTIONS values

2023-06-04 Thread via GitHub


gengliangwang commented on code in PR #41191:
URL: https://github.com/apache/spark/pull/41191#discussion_r1217099067


##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala:
##
@@ -158,30 +158,37 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
 // For CREATE TABLE [AS SELECT], we should use the v1 command if the 
catalog is resolved to the
 // session catalog and the table provider is not v2.
-case c @ CreateTable(ResolvedV1Identifier(ident), _, _, _, _) =>
+case c @ CreateTable(ResolvedV1Identifier(ident), _, _, u: 
UnresolvedTableSpec, _,

Review Comment:
   QQ: why separate UnresolvedOptionsList from UnresolvedTableSpec? Now the 
rule has to match both UnresolvedTableSpec and UnresolvedOptionsList.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #41444: [WIP][SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


beliefer commented on PR #41444:
URL: https://github.com/apache/spark/pull/41444#issuecomment-1575920159

   ping @cloud-fan @HyukjinKwon @zhengruifeng 
   The GA failure is unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun opened a new pull request, #41451: [SPARK-43948][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[0050|0057|0058|0059]

2023-06-04 Thread via GitHub


panbingkun opened a new pull request, #41451:
URL: https://github.com/apache/spark/pull/41451

   ### What changes were proposed in this pull request?
   The pr aims to assign names to the error class 
`_LEGACY_ERROR_TEMP_[0050|0057|0058|0059]`, details as follows:
   - _LEGACY_ERROR_TEMP_0050 => LOCAL_MUST_WITH_SCHEMA_FILE
   - _LEGACY_ERROR_TEMP_0057 => UNSUPPORTED_DEFAULT_VALUE.WITHOUT_SUGGESTION
   - _LEGACY_ERROR_TEMP_0058 => UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION
   - _LEGACY_ERROR_TEMP_0059 => REF_DEFAULT_VALUE_IS_NOT_ALLOWED_IN_PARTITION
   
   ### Why are the changes needed?
   The changes improve the error framework.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   - Manually test.
   - Pass GA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak

2023-06-04 Thread via GitHub


srowen commented on PR #41423:
URL: https://github.com/apache/spark/pull/41423#issuecomment-1575682910

   I'm still not understanding why you analyze this as a leak. Are you saying 
that these objects are not actually usable and will never be collected because 
events are dropped? that would be a leak, but then this isn't the fix - it 
would be to clean up this state even when something is dropped. Is that what 
you're arguing though?
   
   This can't itself be the fix, to just arbitrarily drop state. It's just a 
hack that happens to avoid this in your setup by potentially reporting 
incorrect info. That's why I'm also wondering if there are simply ways to make 
the hot path in this listener faster, to keep up and not drop stuff in the 
first place. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216250498


##
core/src/main/resources/error/error-classes.json:
##
@@ -1513,6 +1513,13 @@
   "Parse Mode: . To process malformed records as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
 ]
   },
+  "MERGE_CARDINALITY_VIOLATION" : {
+"message" : [
+  "The ON search condition of the MERGE statement matched a single row 
from the target table with multiple rows of the source table.",
+  "This could result in the target row being operated on more than once 
with an update or delete operation and is not allowed."
+],
+"sqlState" : "23000"

Review Comment:
   I used `23` class as it is constraint violation but wasn't sure about the 
subclass. It is not defined in the SQL standard so I used `000`, meaning no 
subclass. I am not sure how Spark assigns subclasses in these cases.
   
   Here is an example of this error in SAP docs:
   https://dcx.sap.com/sqla170/en/html/80ca9fd06ce21014bc30ac05c444ee4d.html
   
   Here is the original JIRA for this error in Hive:
   https://issues.apache.org/jira/browse/HIVE-14949



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you opened a new pull request, #41454: [SPARK-43376][SQL][FOLLOWUP] lazy construct subquery to improve reuse subquery

2023-06-04 Thread via GitHub


ulysses-you opened a new pull request, #41454:
URL: https://github.com/apache/spark/pull/41454

   
   
   ### What changes were proposed in this pull request?
   
   https://github.com/apache/spark/pull/41046 make `ReuseAdaptiveSubquery` 
become not idempotent. This pr reverts the change in `ReuseAdaptiveSubquery`.
   
   To solve the same instance issue when planning and reusing subquery in AQE, 
this pr makes `subqueryMap` hold a func of building subquery. Then in 
`PlanAdaptiveSubqueries`, each logical subquery plan can build their own 
instance of physical subquery plan.
   
   ### Why are the changes needed?
   
   To improve reuse subquery in AQE. 
   
   ### Does this PR introduce _any_ user-facing change?
   
   no
   
   ### How was this patch tested?
   
   Pass CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #41446: [SPARK-43956][SQL][3.3] Fix the bug doesn't display column's sql for Percentile[Cont|Disc]

2023-06-04 Thread via GitHub


beliefer commented on PR #41446:
URL: https://github.com/apache/spark/pull/41446#issuecomment-1575508970

   @dongjoon-hyun @MaxGekk Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #41445: [SPARK-43956][SQL][3.4] Fix the bug doesn't display column's sql for Percentile[Cont|Disc]

2023-06-04 Thread via GitHub


beliefer commented on PR #41445:
URL: https://github.com/apache/spark/pull/41445#issuecomment-1575508866

   @MaxGekk Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark-connect-go] HyukjinKwon closed pull request #8: [SPARK-43958] Adding support for Channel Builder

2023-06-04 Thread via GitHub


HyukjinKwon closed pull request #8: [SPARK-43958] Adding support for Channel 
Builder
URL: https://github.com/apache/spark-connect-go/pull/8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark-connect-go] HyukjinKwon commented on pull request #8: [SPARK-43958] Adding support for Channel Builder

2023-06-04 Thread via GitHub


HyukjinKwon commented on PR #8:
URL: https://github.com/apache/spark-connect-go/pull/8#issuecomment-1575537525

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zzzzming95 commented on a diff in pull request #41370: [SPARK-43866] Partition filter condition should pushed down to metastore query if it is equivalence Predicate

2023-06-04 Thread via GitHub


ming95 commented on code in PR #41370:
URL: https://github.com/apache/spark/pull/41370#discussion_r1216785699


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala:
##
@@ -1041,6 +1053,9 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   ExtractableLiteral(value), 
ExtractAttribute(SupportedAttribute(name))) =>
 Some(s"$value ${op.symbol} $name")
 
+  case EqualTo(ExtractStringAttribute(SupportedAttribute(name)), 
ExtractableLiteral(value)) =>
+Some(s"$name = $value")
+

Review Comment:
   This problem seems to occur only when a database such as mysql is used as a 
metastore database. spark's unit test seems to use derby (in 
`PruneHiveTablePartitionsSuite` can not occur this case), which cannot 
reproduce this problem. So it is difficult to implement end-to-end unit test on 
spark side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #41442: [SPARK-43955][BUILD] Upgrade `scalafmt` from 3.7.3 to 3.7.4

2023-06-04 Thread via GitHub


srowen commented on PR #41442:
URL: https://github.com/apache/spark/pull/41442#issuecomment-1575681053

   Merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen closed pull request #41442: [SPARK-43955][BUILD] Upgrade `scalafmt` from 3.7.3 to 3.7.4

2023-06-04 Thread via GitHub


srowen closed pull request #41442: [SPARK-43955][BUILD] Upgrade `scalafmt` from 
3.7.3 to 3.7.4
URL: https://github.com/apache/spark/pull/41442


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #41423: [SPARK-43523][CORE] Fix Spark UI LiveTask memory leak

2023-06-04 Thread via GitHub


srowen commented on PR #41423:
URL: https://github.com/apache/spark/pull/41423#issuecomment-1575695964

   I get it, but this trades one incorrectness for another. I don't know of 
another good way here. Is this resolvable with simply more resources? more 
cores, mem? like is part of the problem GC thrahsing or swap? if so I think I'd 
categorize this as just a symptom of that problem, and really the answer is 
more resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #41444: [WIP][SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


beliefer commented on code in PR #41444:
URL: https://github.com/apache/spark/pull/41444#discussion_r1217334172


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala:
##
@@ -812,6 +812,69 @@ object functions {
*/
   def min_by(e: Column, ord: Column): Column = Column.fn("min_by", e, ord)
 
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column): Column = 
Column.fn("percentile", e, percentage)
+
+  /**
+   * Aggregate function: returns the exact percentile(s) of numeric column 
`expr` at the given
+   * percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile(e: Column, percentage: Column, frequency: Column): Column =
+Column.fn("percentile", e, percentage, frequency)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column): Column =
+Column.fn("percentile_cont", e, percentage)
+
+  /**
+   * Aggregate function: returns a percentile value based on a continuous 
distribution of numeric
+   * or ANSI interval column at the given percentage(s) with value range in 
[0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_cont(e: Column, percentage: Column, reverse: Boolean): Column 
=
+Column.fn("percentile_cont", e, percentage, lit(reverse))
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column): Column =
+Column.fn("percentile_disc", e, percentage)
+
+  /**
+   * Aggregate function: returns the percentile(s) based on a discrete 
distribution of numeric
+   * column `expr` at the given percentage(s) with value range in [0.0, 1.0].
+   *
+   * Note: reverse used to specify whether to reverse calculate percentile 
value.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def percentile_disc(e: Column, percentage: Column, reverse: Boolean): Column 
=

Review Comment:
   I know that.
   But there are exists many same case. such as:
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L473
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L489
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L506
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L621
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L633
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L686
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L702
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1093
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1164
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L1179
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2745
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L2759
   
https://github.com/apache/spark/blob/b00210bd0320afef282b68c8ef7f8d972e9e19f5/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala#L3498
   

[GitHub] [spark] yaooqinn commented on pull request #41181: [SPARK-43504][K8S] Mounts the hadoop config map on the executor pod

2023-06-04 Thread via GitHub


yaooqinn commented on PR #41181:
URL: https://github.com/apache/spark/pull/41181#issuecomment-1575994314

   thanks, @dongjoon-hyun and @turboFei. Late +1 from my side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] degant commented on a diff in pull request #41428: [SPARK-41958][CORE][3.3] Disallow arbitrary custom classpath with proxy user in cluster mode

2023-06-04 Thread via GitHub


degant commented on code in PR #41428:
URL: https://github.com/apache/spark/pull/41428#discussion_r1217444147


##
docs/core-migration-guide.md:
##
@@ -25,6 +25,7 @@ license: |
 ## Upgrading from Core 3.2 to 3.3
 
 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because 
log4j 1.x has reached end of life and is no longer supported by the community. 
Vulnerabilities reported after August 2015 against log4j 1.x were not checked 
and will not be fixed. Users should rewrite original log4j properties files 
using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the 
`conf/log4j.properties.template` which is included in Spark distribution, to 
`conf/log4j2.properties.template` with log4j2 properties format.
+- Since Spark 3.3.3, 
`spark.submit.proxyUser.allowCustomClasspathInClusterMode` allows users to 
disable custom class path in cluster mode by proxy users. It still defaults to 
`true` to maintain backward compatibility.  

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


beliefer commented on PR #41444:
URL: https://github.com/apache/spark/pull/41444#issuecomment-1576020831

   @zhengruifeng The two functions used with SQL syntax like 
`percentile_cont(0.5) WITHIN GROUP (ORDER BY v)`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on pull request #41455: [SPARK-43962][SQL] Improve error messages: `CANNOT_DECODE_URL`, `CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE`, `CANNOT_PARSE_DECIMAL`, `CANNOT_READ_F

2023-06-04 Thread via GitHub


itholic commented on PR #41455:
URL: https://github.com/apache/spark/pull/41455#issuecomment-1576034146

   cc @MaxGekk @srielau @cloud-fan Please review this when you find some time  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217472430


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -397,7 +397,8 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
 }
 val newQuery = try {
   TableOutputResolver.resolveOutputColumns(
-tblName, expectedColumns, query, byName = isByName, conf, 
supportColDefaultValue = true)
+tblName, expectedColumns, query, byName = isByName || insert.byName, 
conf,

Review Comment:
   let's rename the `isByName` variable to `hasColumnList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vinodkc commented on a diff in pull request #41144: [SPARK-43470][CORE] Add OS, Java, Python version information to application log

2023-06-04 Thread via GitHub


vinodkc commented on code in PR #41144:
URL: https://github.com/apache/spark/pull/41144#discussion_r1217472619


##
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##
@@ -106,6 +106,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   protected val envVars: java.util.Map[String, String] = 
funcs.head.funcs.head.envVars
   protected val pythonExec: String = funcs.head.funcs.head.pythonExec
   protected val pythonVer: String = funcs.head.funcs.head.pythonVer
+  logInfo(s"Python version info: $pythonExec($pythonVer)")

Review Comment:
   Updated code to print version and package details only once per executor
   Added a bit more details to log list of installed packages too. 
   A new property `spark.executor.python.worker.log.details` has been added to 
enable this Python info  logging 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic opened a new pull request, #41455: [SPARK-43962][SQL] Improve error messages: `CANNOT_DECODE_URL`, `CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE`, `CANNOT_PARSE_DECIMAL`, `CANNOT_READ_

2023-06-04 Thread via GitHub


itholic opened a new pull request, #41455:
URL: https://github.com/apache/spark/pull/41455

   ### What changes were proposed in this pull request?
   
   This PR proposes to improve error messages for `CANNOT_DECODE_URL`, 
`CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE`, `CANNOT_PARSE_DECIMAL`, 
`CANNOT_READ_FILE_FOOTER`, `CANNOT_RECOGNIZE_HIVE_TYPE`.
   
   **NOTE:** This PR is an experimental work that utilizes LLM to enhance error 
messages. The script was created using the `openai` Python library from OpenAI, 
and minimal review was conducted by author after executing the script.
   
   
   
   ### Why are the changes needed?
   
   For improving errors to make them more actionable and usable.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No API changes, only error message improvement.
   
   
   ### How was this patch tested?
   
   The existing CI should pass.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217471748


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
+_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
   i.copy(table = readDataSourceTable(tableMeta, options))
 
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false), _, _, _, _, _) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),
+_, _, _, _, _, _) =>
   i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
 case UnresolvedCatalogRelation(tableMeta, options, false)
-if DDLUtils.isDatasourceTable(tableMeta) =>
+  if DDLUtils.isDatasourceTable(tableMeta) =>

Review Comment:
   ```suggestion
   if DDLUtils.isDatasourceTable(tableMeta) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217472203


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -397,7 +397,8 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
 }
 val newQuery = try {
   TableOutputResolver.resolveOutputColumns(
-tblName, expectedColumns, query, byName = isByName, conf, 
supportColDefaultValue = true)
+tblName, expectedColumns, query, byName = isByName || insert.byName, 
conf,

Review Comment:
   let's rename the `isByName` variable to `hasColumnList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on pull request #41136: [SPARK-43356][K8S] Migrate deprecated createOrReplace to serverSideApply

2023-06-04 Thread via GitHub


pan3793 commented on PR #41136:
URL: https://github.com/apache/spark/pull/41136#issuecomment-1576003455

   rebased on the latest master branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217470812


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -151,8 +151,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
 if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
   CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, 
query.output.map(_.name))
 
-case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, 
_),
-parts, _, query, overwrite, false) if parts.isEmpty =>
+case InsertIntoStatement(l@LogicalRelation(_: InsertableRelation, _, _, _),
+parts, _, query, overwrite, false, _) if parts.isEmpty =>

Review Comment:
   ```suggestion
   parts, _, query, overwrite, false, _) if parts.isEmpty =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217471514


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
+_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
   i.copy(table = readDataSourceTable(tableMeta, options))
 
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false), _, _, _, _, _) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),

Review Comment:
   ```suggestion
   case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false),
   ```



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
+_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
   i.copy(table = readDataSourceTable(tableMeta, options))
 
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false), _, _, _, _, _) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),
+_, _, _, _, _, _) =>

Review Comment:
   ```suggestion
   _, _, _, _, _, _) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217471285


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),

Review Comment:
   ```suggestion
   case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, 
options, false),
   ```



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -274,15 +274,16 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-_, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+case i@InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
+_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>

Review Comment:
   ```suggestion
   _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #41370: [SPARK-43866] Partition filter condition should pushed down to metastore query if it is equivalence Predicate

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #41370:
URL: https://github.com/apache/spark/pull/41370#discussion_r1217481906


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala:
##
@@ -994,6 +994,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   }
 }
 
+object ExtractStringAttribute {
+  @scala.annotation.tailrec
+  def unapply(expr: Expression): Option[Attribute] = {
+expr match {
+  case attr: Attribute => Some(attr)
+  case Cast(child, dt: IntegralType, _, _)
+if child.dataType.isInstanceOf[StringType] => unapply(child)

Review Comment:
   This assumes Spark and Hive have the same behavior for implicit cast. Is it 
always true? For example, cast string to int may fail in Spark with ansi mode 
on, does hive have the same behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #41407: [SPARK-43900][SQL] Support optimize skewed partitions even if introduce extra shuffle

2023-06-04 Thread via GitHub


ulysses-you commented on code in PR #41407:
URL: https://github.com/apache/spark/pull/41407#discussion_r1217406788


##
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##
@@ -104,7 +104,10 @@ case class AdaptiveSparkPlanExec(
   @transient private val costEvaluator =
 conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match {
   case Some(className) => CostEvaluator.instantiate(className, 
session.sparkContext.getConf)
-  case _ => 
SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN))
+  case _ =>
+val forceOptimizeSkewed = 
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN) ||
+  conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_IN_REBALANCE_PARTITIONS)

Review Comment:
   it seems fragile, what happens if user enable one and disable the other ? 
the result behavior is uncontrolled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pan3793 commented on a diff in pull request #41428: [SPARK-41958][CORE][3.3] Disallow arbitrary custom classpath with proxy user in cluster mode

2023-06-04 Thread via GitHub


pan3793 commented on code in PR #41428:
URL: https://github.com/apache/spark/pull/41428#discussion_r1217437344


##
docs/core-migration-guide.md:
##
@@ -25,6 +25,7 @@ license: |
 ## Upgrading from Core 3.2 to 3.3
 
 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because 
log4j 1.x has reached end of life and is no longer supported by the community. 
Vulnerabilities reported after August 2015 against log4j 1.x were not checked 
and will not be fixed. Users should rewrite original log4j properties files 
using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the 
`conf/log4j.properties.template` which is included in Spark distribution, to 
`conf/log4j2.properties.template` with log4j2 properties format.
+- Since Spark 3.3.3, 
`spark.submit.proxyUser.allowCustomClasspathInClusterMode` allows users to 
disable custom class path in cluster mode by proxy users. It still defaults to 
`true` to maintain backward compatibility.  

Review Comment:
   nit: an empty line between items



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on PR #41448:
URL: https://github.com/apache/spark/pull/41448#issuecomment-1576080545

   The test failures don't seem related. I'll need to take a closer look at 
what happened in `sql - other tests`, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Hisoka-X commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


Hisoka-X commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217529014


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -397,7 +397,8 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
 }
 val newQuery = try {
   TableOutputResolver.resolveOutputColumns(
-tblName, expectedColumns, query, byName = isByName, conf, 
supportColDefaultValue = true)
+tblName, expectedColumns, query, byName = isByName || insert.byName, 
conf,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a diff in pull request #41316: [SPARK-43798][SQL][PYTHON] Support Python user-defined table functions

2023-06-04 Thread via GitHub


allisonwang-db commented on code in PR #41316:
URL: https://github.com/apache/spark/pull/41316#discussion_r1217498437


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala:
##
@@ -171,6 +186,18 @@ case class ArrowEvalPython(
 copy(child = newChild)
 }
 
+/**
+ * A logical plan that evaluates a [[PythonUDTF]].
+ */
+case class BatchEvalPythonUDTF(

Review Comment:
   Yes batch eval is the baseline impl and we should enable arrow by default 
for UDTFs. I am going to work on it next given we are supporting complex types 
for arrow based UDFs. Created a follow-up ticket: 
https://issues.apache.org/jira/browse/SPARK-43964.
   
   > It's not worthwhile refactoring the batch eval code path 
   
   @HyukjinKwon that's a good point. Should I consider duplicating the code 
instead of refactoring it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41449: [SPARK-43959][SQL] Make RowLevelOperationSuiteBase and AlignAssignmentsSuite abstract

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41449:
URL: https://github.com/apache/spark/pull/41449#discussion_r1217400261


##
sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuite.scala:
##
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType}
 
-class AlignAssignmentsSuite extends AnalysisTest {
+abstract class AlignAssignmentsSuite extends AnalysisTest {

Review Comment:
   Agree, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vinodkc commented on pull request #41144: [SPARK-43470][CORE] Add OS, Java, Python version information to application log

2023-06-04 Thread via GitHub


vinodkc commented on PR #41144:
URL: https://github.com/apache/spark/pull/41144#issuecomment-1576034499

   > For the Python, the information is printed at every task execution. Could 
you find a proper place to print that info once, @vinodkc ?
   > 
   > ```
   > 23/06/03 18:29:46 INFO PythonRunner: Python version info: python3(3.11)
   > 23/06/03 18:29:46 INFO PythonRunner: Python version info: python3(3.11)
   > 23/06/03 18:29:46 INFO PythonRunner: Times: total = 38, boot = -8568, init 
= 8606, finish = 0
   > 23/06/03 18:29:46 INFO PythonRunner: Times: total = 38, boot = -8571, init 
= 8609, finish = 0
   > 23/06/03 18:29:46 INFO Executor: Finished task 0.0 in stage 18.0 (TID 26). 
1393 bytes result sent to driver
   > 23/06/03 18:29:46 INFO Executor: Finished task 1.0 in stage 18.0 (TID 27). 
1396 bytes result sent to driver
   > 23/06/03 18:29:50 INFO CoarseGrainedExecutorBackend: Got assigned task 28
   > 23/06/03 18:29:50 INFO CoarseGrainedExecutorBackend: Got assigned task 29
   > 23/06/03 18:29:50 INFO Executor: Running task 0.0 in stage 19.0 (TID 28)
   > 23/06/03 18:29:50 INFO Executor: Running task 1.0 in stage 19.0 (TID 29)
   > 23/06/03 18:29:50 INFO TorrentBroadcast: Started reading broadcast 
variable 14 with 1 pieces (estimated total size 4.0 MiB)
   > 23/06/03 18:29:50 INFO MemoryStore: Block broadcast_14_piece0 stored as 
bytes in memory (estimated size 3.7 KiB, free 366.2 MiB)
   > 23/06/03 18:29:50 INFO TorrentBroadcast: Reading broadcast variable 14 
took 20 ms
   > 23/06/03 18:29:50 INFO MemoryStore: Block broadcast_14 stored as values in 
memory (estimated size 5.7 KiB, free 366.2 MiB)
   > 23/06/03 18:29:50 INFO PythonRunner: Python version info: python3(3.11)
   > 23/06/03 18:29:50 INFO PythonRunner: Python version info: python3(3.11)
   > ```
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217473051


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 
   def apply(plan: LogicalPlan): Unit = {
 plan.foreach {
-  case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), 
partition, _, query, _, _) =>
+  case InsertIntoStatement(l@LogicalRelation(relation, _, _, _), partition,
+  _, query, _, _, _) =>

Review Comment:
   ```suggestion
 _, query, _, _, _) =>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217472863


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -425,7 +426,7 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-case i @ InsertIntoStatement(table, _, _, query, _, _) if table.resolved 
&& query.resolved =>
+case i@InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved 
&& query.resolved =>

Review Comment:
   ```suggestion
   case i @ InsertIntoStatement(table, _, _, query, _, _, _) if 
table.resolved && query.resolved =>
   ```



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 
   def apply(plan: LogicalPlan): Unit = {
 plan.foreach {
-  case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), 
partition, _, query, _, _) =>
+  case InsertIntoStatement(l@LogicalRelation(relation, _, _, _), partition,

Review Comment:
   ```suggestion
 case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), 
partition,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217473411


##
sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala:
##
@@ -46,21 +47,24 @@ trait SQLInsertTestSuite extends QueryTest with 
SQLTestUtils {
   }
 
   protected def processInsert(
-  tableName: String,
-  input: DataFrame,
-  cols: Seq[String] = Nil,
-  partitionExprs: Seq[String] = Nil,
-  overwrite: Boolean): Unit = {
+   tableName: String,

Review Comment:
   nit: please keep the previous indentation which is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a diff in pull request #41316: [SPARK-43798][SQL][PYTHON] Support Python user-defined table functions

2023-06-04 Thread via GitHub


allisonwang-db commented on code in PR #41316:
URL: https://github.com/apache/spark/pull/41316#discussion_r1217498437


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala:
##
@@ -171,6 +186,18 @@ case class ArrowEvalPython(
 copy(child = newChild)
 }
 
+/**
+ * A logical plan that evaluates a [[PythonUDTF]].
+ */
+case class BatchEvalPythonUDTF(

Review Comment:
   Yes batch eval is the baseline impl and we should enable arrow by default 
for UDTFs. I am going to work on it next given we are supporting complex types 
for arrow based UDFs.
   
   > It's not worthwhile refactoring the batch eval code path 
   
   @HyukjinKwon that's a good point. Should I consider duplicating the code 
instead of refactoring it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #41444: [SPARK-43916][SQL][PYTHON][CONNECT] Add percentile like functions to Scala and Python API

2023-06-04 Thread via GitHub


zhengruifeng commented on PR #41444:
URL: https://github.com/apache/spark/pull/41444#issuecomment-1575995156

   Where are `percentile_cont` and `percentile_disc` from?
   I can not find them in 
https://spark.apache.org/docs/latest/api/sql/index.html and `FunctionRegistry`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #41425: [SPARK-43919][SQL] Extract JSON functionality out of Row

2023-06-04 Thread via GitHub


amaliujia commented on PR #41425:
URL: https://github.com/apache/spark/pull/41425#issuecomment-1576068930

   @cloud-fan done. wait CI to pass again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217474065


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala:
##
@@ -145,7 +145,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
 
 // handles InsertIntoStatement specially as the table in 
InsertIntoStatement is not added in its
 // children, hence not matched directly by previous HiveTableRelation case.
-case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
+case i@InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _)

Review Comment:
   ```suggestion
   case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, 
_)
   ```



##
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala:
##
@@ -226,12 +226,12 @@ case class RelationConversions(
 plan resolveOperators {
   // Write path
   case InsertIntoStatement(
-  r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists)
+  r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists, byName)

Review Comment:
   ```suggestion
 r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists, byName)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a diff in pull request #41316: [SPARK-43798][SQL][PYTHON] Support Python user-defined table functions

2023-06-04 Thread via GitHub


allisonwang-db commented on code in PR #41316:
URL: https://github.com/apache/spark/pull/41316#discussion_r1217487382


##
python/pyspark/sql/functions.py:
##
@@ -10403,6 +10405,82 @@ def udf(
 return _create_py_udf(f=f, returnType=returnType, useArrow=useArrow)
 
 
+def udtf(
+f: Optional[Type] = None,
+returnType: Union[StructType, str] = None,

Review Comment:
   I think it's more readable if we require this to be invoked like 
`@udtf(returnType="...")`, and it doesn't make sense for UDTFs to have a 
default value for the return type (unlike udfs which use `StringType`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #41332: [SPARK-43801][SQL] Support unwrap date type to string type in UnwrapCastInBinaryComparison

2023-06-04 Thread via GitHub


cloud-fan commented on PR #41332:
URL: https://github.com/apache/spark/pull/41332#issuecomment-1575962729

   My suggestion for this problem is not to abuse the string type. If the 
column holds timestamp values, it should be timestamp type. If you know that 
your string-type "timestamp" type always contains standard timestamp string, 
you can also manually rewrite your predicate to just compare strings.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #41449: [SPARK-43959][SQL] Make RowLevelOperationSuiteBase and AlignAssignmentsSuite abstract

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #41449:
URL: https://github.com/apache/spark/pull/41449#discussion_r1217394475


##
sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignAssignmentsSuite.scala:
##
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, IntegerType, StructType}
 
-class AlignAssignmentsSuite extends AnalysisTest {
+abstract class AlignAssignmentsSuite extends AnalysisTest {

Review Comment:
   nit: if this is a base suite, probably add a `Base` suffix in the name, to 
be consistent with `RowLevelOperationSuiteBase`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #41425: [SPARK-43919][SQL] Extract JSON functionality out of Row

2023-06-04 Thread via GitHub


cloud-fan commented on PR #41425:
URL: https://github.com/apache/spark/pull/41425#issuecomment-1575992736

   @amaliujia can you fix conflicts? I think this PR is ready to go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217470181


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala:
##
@@ -165,19 +165,25 @@ case class QualifiedColType(
  * would have Map('a' -> Some('1'), 'b' -> None).
  * @param ifPartitionNotExists If true, only write if the partition does not 
exist.
  * Only valid for static partitions.
+ * @param byName   If true, before the data is written, it will be 
sorted according to

Review Comment:
   ```suggestion
* @param byName   If true, reorder the data columns to match 
the column names of the target table.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #40908: [SPARK-42750][SQL] Support Insert By Name statement

2023-06-04 Thread via GitHub


cloud-fan commented on code in PR #40908:
URL: https://github.com/apache/spark/pull/40908#discussion_r1217470419


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -151,8 +151,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
 if query.resolved && DDLUtils.isDatasourceTable(tableDesc) =>
   CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, 
query.output.map(_.name))
 
-case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, 
_),
-parts, _, query, overwrite, false) if parts.isEmpty =>
+case InsertIntoStatement(l@LogicalRelation(_: InsertableRelation, _, _, _),

Review Comment:
   ```suggestion
   case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, 
_, _),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216253601


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala:
##
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Instruction
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.types.DataType
+
+case class MergeRows(
+isSourceRowPresent: Expression,
+isTargetRowPresent: Expression,
+matchedInstructions: Seq[Instruction],
+notMatchedInstructions: Seq[Instruction],
+notMatchedBySourceInstructions: Seq[Instruction],
+checkCardinality: Boolean,
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+
+  override lazy val producedAttributes: AttributeSet = {
+AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override lazy val references: AttributeSet = child.outputSet
+
+  override def simpleString(maxFields: Int): String = {
+s"MergeRows${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+copy(child = newChild)
+  }
+}
+
+object MergeRows {
+  final val ROW_ID = "__row_id"
+
+  sealed trait Instruction extends Expression with Unevaluable {

Review Comment:
   Each `MergeAction` gets converted into a particular instance of 
`Instruction`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a diff in pull request #41448: [SPARK-43885][SQL] DataSource V2: Handle MERGE commands for delta-based sources

2023-06-04 Thread via GitHub


aokolnychyi commented on code in PR #41448:
URL: https://github.com/apache/spark/pull/41448#discussion_r1216255732


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.roaringbitmap.longlong.Roaring64Bitmap
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.AttributeSet
+import org.apache.spark.sql.catalyst.expressions.BasePredicate
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Projection
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Instruction, 
Keep, ROW_ID, Split}
+import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.UnaryExecNode
+
+case class MergeRowsExec(
+isSourceRowPresent: Expression,
+isTargetRowPresent: Expression,
+matchedInstructions: Seq[Instruction],
+notMatchedInstructions: Seq[Instruction],
+notMatchedBySourceInstructions: Seq[Instruction],
+checkCardinality: Boolean,
+output: Seq[Attribute],
+child: SparkPlan) extends UnaryExecNode {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  @transient override lazy val references: AttributeSet = child.outputSet
+
+  override def simpleString(maxFields: Int): String = {
+s"MergeRowsExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+copy(child = newChild)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+child.execute().mapPartitions(processPartition)
+  }
+
+  private def processPartition(rowIterator: Iterator[InternalRow]): 
Iterator[InternalRow] = {
+val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
+
+val matchedInstructionExecs = planInstructions(matchedInstructions)
+val notMatchedInstructionExecs = planInstructions(notMatchedInstructions)
+val notMatchedBySourceInstructionExecs = 
planInstructions(notMatchedBySourceInstructions)
+
+val cardinalityValidator = if (checkCardinality) {
+  val rowIdOrdinal = child.output.indexWhere(attr => 
conf.resolver(attr.name, ROW_ID))
+  assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+  BitmapCardinalityValidator(rowIdOrdinal)
+} else {
+  NoopCardinalityValidator
+}
+
+val mergeIterator = new MergeRowIterator(
+  rowIterator, cardinalityValidator, isTargetRowPresentPred, 
isSourceRowPresentPred,
+  matchedInstructionExecs, notMatchedInstructionExecs, 
notMatchedBySourceInstructionExecs)
+
+// null indicates a record must be discarded
+mergeIterator.filter(_ != null)
+  }
+
+  private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+UnsafeProjection.create(exprs, child.output)
+  }
+
+  private def createPredicate(expr: Expression): BasePredicate = {
+GeneratePredicate.generate(expr, child.output)
+  }
+
+  private def planInstructions(instructions: Seq[Instruction]): 
Seq[InstructionExec] = {
+instructions.map {
+  case Keep(cond, output) =>
+KeepExec(createPredicate(cond), createProjection(output))
+  case Split(cond, output, otherOutput) =>
+SplitExec(createPredicate(cond), createProjection(output), 
createProjection(otherOutput))
+  case other =>
+throw new AnalysisException(s"Unexpected instruction: $other")
+}
+  }
+
+  sealed trait InstructionExec {
+def condition: BasePredicate
+  }
+
+  case class 

[GitHub] [spark] dongjoon-hyun closed pull request #41438: [SPARK-43953][CONNECT] Remove `pass`

2023-06-04 Thread via GitHub


dongjoon-hyun closed pull request #41438: [SPARK-43953][CONNECT] Remove `pass`
URL: https://github.com/apache/spark/pull/41438


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #41437: [SPARK-43917][PS][INFRA] Upgrade `pandas` to 2.0.2

2023-06-04 Thread via GitHub


dongjoon-hyun closed pull request #41437: [SPARK-43917][PS][INFRA] Upgrade 
`pandas` to 2.0.2
URL: https://github.com/apache/spark/pull/41437


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #41409: [SPARK-43901][SQL] Avro to Support custom decimal type backed by Long

2023-06-04 Thread via GitHub


dongjoon-hyun commented on PR #41409:
URL: https://github.com/apache/spark/pull/41409#issuecomment-1575421708

   Could you resolve the conflict, @siying ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum closed pull request #41419: [SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array

2023-06-04 Thread via GitHub


wangyum closed pull request #41419: [SPARK-43911] [SQL] Use toSet to 
deduplicate the iterator data to prevent the creation of large Array
URL: https://github.com/apache/spark/pull/41419


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on pull request #41419: [SPARK-43911] [SQL] Use toSet to deduplicate the iterator data to prevent the creation of large Array

2023-06-04 Thread via GitHub


wangyum commented on PR #41419:
URL: https://github.com/apache/spark/pull/41419#issuecomment-1575436568

   Merged to master and branch-3.4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org