[GitHub] [spark] LuciferYang opened a new pull request, #39135: [SPARK-41427][UI][FOLLOWUP] Remove duplicate `getMetricValue` from `ExecutorMetricsSerializer#serialize`

2022-12-19 Thread GitBox


LuciferYang opened a new pull request, #39135:
URL: https://github.com/apache/spark/pull/39135

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #39100: [SPARK-41427][UI] Protobuf serializer for ExecutorStageSummaryWrapper

2022-12-19 Thread GitBox


LuciferYang commented on code in PR #39100:
URL: https://github.com/apache/spark/pull/39100#discussion_r1053000909


##
core/src/main/scala/org/apache/spark/status/protobuf/ExecutorMetricsSerializer.scala:
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.metrics.ExecutorMetricType
+
+object ExecutorMetricsSerializer {
+  def serialize(e: ExecutorMetrics): StoreTypes.ExecutorMetrics = {
+val builder = StoreTypes.ExecutorMetrics.newBuilder()
+ExecutorMetricType.metricToOffset.foreach { case (metric, _) =>
+  builder.putMetrics(metric, e.getMetricValue(metric))
+  metric -> e.getMetricValue(metric)

Review Comment:
   @gengliangwang 
   line 27 already `putMetrics(metric, e.getMetricValue(metric))`,
   
   What is the purpose of line 28 (`metric -> e.getMetricValue(metric)` )? I 
didn't understand it. Could you explain? Thanks.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052995619


##
sql/core/src/test/resources/sql-tests/results/group-by-star-mosha.sql.out:
##
@@ -0,0 +1,141 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+create temporary view stuff as select * from values
+  (42, 9.75, 'hello world', '1970-08-07', '13.37', array(1,20,300)),
+  (1337, 1.2345, 'oh no', '2000-01-01', '42.0', array(4000,5,60)),
+  (42, 13.37, 'test', '1970-08-07', '1234567890', 
array(700,8000,9))
+  as stuff(i, f, s, t, d, a)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT 100 * SUM(i) + SUM(f) / COUNT(s) AS f1, i AS f2 FROM stuff GROUP BY * 
ORDER BY f2
+-- !query schema
+struct
+-- !query output
+8411.56000 42
+133701.23450   1337
+
+
+-- !query
+SELECT i + 1 AS i1, COUNT(i - 2) ci, f / i AS fi, SUM(i + f) sif FROM stuff 
GROUP BY * ORDER BY 1, 3
+-- !query schema
+struct
+-- !query output
+43 1   0.232142857142857   51.7500
+43 1   0.318   55.3700
+1338   1   0.000923335826477   1338.2345
+
+
+-- !query
+SELECT i AS i, COUNT(i) ci, f AS f, SUM(i + f) sif FROM stuff GROUP BY * ORDER 
BY 1, i, 2, ci, 3, f, 4, sif
+-- !query schema
+struct
+-- !query output
+42 1   9.7500  51.7500
+42 1   13.3700 55.3700
+1337   1   1.2345  1338.2345
+
+
+-- !query
+SELECT i + 1, f / i, substring(s, 2, 3), extract(year from t), d / 2, size(a) 
FROM stuff
+GROUP BY * ORDER BY 1, 3, 4, 5, 6, 2
+-- !query schema
+struct<(i + 1):int,(f / i):decimal(17,15),substring(s, 2, 
3):string,extract(year FROM t):int,(d / 2):double,size(a):int>
+-- !query output
+43 0.232142857142857   ell 19706.685   3
+43 0.318   est 19706.17283945E83
+1338   0.000923335826477   h n 200021.03
+
+
+-- !query
+SELECT i + SUM(f) FROM stuff GROUP BY *
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "MISSING_GROUP_BY",

Review Comment:
   this is now fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052993548


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, 
AGGREGATE_EXPRESSION, UNRESOLVED_STAR}
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsAllPatterns(UNRESOLVED_STAR, AGGREGATE), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.containsPattern(AGGREGATE_EXPRESSION))

Review Comment:
   @gengliangwang i learned that this doesn't handle PythonUDF aggregate 
properly. I think we should fix that issue instead of changing it here though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052993302


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, 
AGGREGATE_EXPRESSION, UNRESOLVED_STAR}
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsAllPatterns(UNRESOLVED_STAR, AGGREGATE), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.containsPattern(AGGREGATE_EXPRESSION))
+
+// If the grouping exprs are empty, this could either be (1) a valid 
global aggregate, or
+// (2) we simply fail to infer the grouping columns. As an example, in 
"i + sum(j)", we will
+// not automatically infer the grouping column to be "i".
+if (groupingExprs.isEmpty && 
a.aggregateExpressions.exists(containsAttribute)) {
+  // Case (2): don't replace the star. We will eventually tell the 
user in checkAnalysis
+  // that we cannot resolve the star in group by.
+  a
+} else {
+  // Case (1): this is a valid global aggregate.
+  a.copy(groupingExpressions = groupingExprs)
+}
+  } else {
+a
+  }
+  }
+
+  /**
+   * Returns true if the expression includes an Attribute outside the 
aggregate expression part.
+   * For example:
+   *  "i" -> true
+   *  "i + 2" -> true
+   *  "i + sum(j)" -> true
+   *  "sum(j)" -> false
+   *  "sum(j) / 2" -> false
+   */
+  private def containsAttribute(expr: Expression): Boolean = expr match {
+case _ if AggregateExpression.isAggregate(expr) =>
+  // Don't recurse into AggregateExpressions
+  false
+case _: Attribute =>
+  true
+  // TODO: do we need to worry about ScalarSubquery here?

Review Comment:
   cc @cloud-fan do we need to worry about ScalarSubquery here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR closed pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR closed pull request #38517: [SPARK-39591][SS] Async Progress 
Tracking
URL: https://github.com/apache/spark/pull/38517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on PR #38517:
URL: https://github.com/apache/spark/pull/38517#issuecomment-1358951698

   https://github.com/jerrypeng/spark/actions/runs/3737519049 
   Above build passed for the last commit 
[23210ec](https://github.com/apache/spark/pull/38517/commits/23210ecea2bf9b39267613d6b4f356dd057cf890),
 and 
[03222c2](https://github.com/apache/spark/pull/38517/commits/03222c21f2f9d0829a0baf326d9a12626c3547d4)
 only addressed minors.
   
   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #38984: [SPARK-41349][CONNECT][PYTHON] Implement DataFrame.hint

2022-12-19 Thread GitBox


amaliujia commented on PR #38984:
URL: https://github.com/apache/spark/pull/38984#issuecomment-1358948232

   many thanks @dengziming to tackle this work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

2022-12-19 Thread GitBox


cloud-fan commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052986240


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##
@@ -244,6 +303,61 @@ object ResolveLateralColumnAliasReference extends 
Rule[LogicalPlan] {
   child = Project(innerProjectList.toSeq, child)
 )
   }
+
+case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if 
agg.resolved
+&& 
aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) 
=>
+
+  // Check if current Aggregate is eligible to lift up with Project: 
the aggregate
+  // expression only contains: 1) aggregate functions, 2) grouping 
expressions, 3) lateral
+  // column alias reference or 4) literals.
+  // This check is to prevent unnecessary transformation on invalid 
plan, to guarantee it
+  // throws the same exception. For example, cases like non-aggregate 
expressions not
+  // in group by, once transformed, will throw a different exception: 
missing input.
+  def eligibleToLiftUp(exp: Expression): Boolean = {
+exp match {
+  case e: AggregateExpression if 
AggregateExpression.isAggregate(e) => true

Review Comment:
   ```suggestion
 case e: Expression if AggregateExpression.isAggregate(e) => 
true
   ```



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##
@@ -244,6 +303,61 @@ object ResolveLateralColumnAliasReference extends 
Rule[LogicalPlan] {
   child = Project(innerProjectList.toSeq, child)
 )
   }
+
+case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if 
agg.resolved
+&& 
aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) 
=>
+
+  // Check if current Aggregate is eligible to lift up with Project: 
the aggregate
+  // expression only contains: 1) aggregate functions, 2) grouping 
expressions, 3) lateral
+  // column alias reference or 4) literals.
+  // This check is to prevent unnecessary transformation on invalid 
plan, to guarantee it
+  // throws the same exception. For example, cases like non-aggregate 
expressions not
+  // in group by, once transformed, will throw a different exception: 
missing input.
+  def eligibleToLiftUp(exp: Expression): Boolean = {
+exp match {
+  case e: AggregateExpression if 
AggregateExpression.isAggregate(e) => true

Review Comment:
   ```suggestion
 case e if AggregateExpression.isAggregate(e) => true
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] EnricoMi commented on pull request #39131: [SPARK-41162][SQL] Fix anti- and semi-join for self-join with aggregations

2022-12-19 Thread GitBox


EnricoMi commented on PR #39131:
URL: https://github.com/apache/spark/pull/39131#issuecomment-1358939935

   @cloud-fan I think this is a better approach to fix correctness bug 
SPARK-41162 than #38676.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark-docker] Yikun commented on pull request #27: [SPARK-40513] Add support to generate DOI mainifest

2022-12-19 Thread GitBox


Yikun commented on PR #27:
URL: https://github.com/apache/spark-docker/pull/27#issuecomment-1358933985

   cc @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark-docker] Yikun opened a new pull request, #27: [SPARK-40513] Add support to generate DOI mainifest

2022-12-19 Thread GitBox


Yikun opened a new pull request, #27:
URL: https://github.com/apache/spark-docker/pull/27

   ### What changes were proposed in this pull request?
   This patch add support to generate DOI mainifest from versions.json.
   
   
   ### Why are the changes needed?
   To help generate DOI mainifest
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   ```shell
   $ black ./tools/manifest.py
   All done! ✨  ✨
   1 file left unchanged.
   ```
   
   ```shell
   $ tools/manifest.py manifest
   Maintainers: Apache Spark Developers  (@ApacheSpark)
   GitRepo: https://github.com/apache/spark-docker.git
   
   Tags: 3.3.1-scala2.12-java11-python3-ubuntu, 3.3.1-python3, 3.3.1, python3, 
latest
   Architectures: amd64, arm64v8
   GitCommit: 496edb6dee0ade08bc5d180d7a6da0ff8b5d91ff
   Directory: ./3.3.1/scala2.12-java11-python3-ubuntu
   
   Tags: 3.3.1-scala2.12-java11-r-ubuntu, 3.3.1-r, r
   Architectures: amd64, arm64v8
   GitCommit: 496edb6dee0ade08bc5d180d7a6da0ff8b5d91ff
   Directory: ./3.3.1/scala2.12-java11-r-ubuntu
   
   // ... ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #39054: [SPARK-27561][SQL][FOLLOWUP] Move the two rules for Later column alias into one file

2022-12-19 Thread GitBox


cloud-fan commented on PR #39054:
URL: https://github.com/apache/spark/pull/39054#issuecomment-1358931368

   Reverted at 
https://github.com/apache/spark/commit/52082d3906bc3813cd3ff4447f7c75beb4f28612


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #39054: [SPARK-27561][SQL][FOLLOWUP] Move the two rules for Later column alias into one file

2022-12-19 Thread GitBox


cloud-fan commented on PR #39054:
URL: https://github.com/apache/spark/pull/39054#issuecomment-1358929659

   Since this PR introduced a regression (case insensitive problem) and it's 
actually not necessary after my refactor 
https://github.com/apache/spark/pull/3 , I'm reverting it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

2022-12-19 Thread GitBox


cloud-fan commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052973933


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##
@@ -182,6 +183,157 @@ object AnalysisContext {
   }
 }
 
+object Analyzer extends Logging {

Review Comment:
   This is even more change than reverting 
https://github.com/apache/spark/pull/39054 ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] venkyvb commented on pull request #37738: add Support Java Class with circular references

2022-12-19 Thread GitBox


venkyvb commented on PR #37738:
URL: https://github.com/apache/spark/pull/37738#issuecomment-1358924659

   Hey all,
   Wondering if this PR (or some similar fix got merged). I have similar issues 
with circular references and it would be great to have an option to skip the 
check.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052964642


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -78,7 +72,9 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   test("async WAL commits happy path") {
 val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
-val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+//val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)

Review Comment:
   already done



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -283,3 +264,23 @@ class AsyncProgressTrackingMicroBatchExecution(
 }
   }
 }
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+   extraOptions: 
Map[String, String]): Long = {

Review Comment:
   already done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052963923


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -275,7 +298,7 @@ object AsyncProgressTrackingMicroBatchExecution {
 "_asyncProgressTrackingOverrideSinkSupportCheck"
 
   private def getAsyncProgressTrackingCheckpointingIntervalMs(
-   extraOptions: 
Map[String, String]): Long = {
+extraOptions: Map[String, String]): Long = {

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052964187


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -157,8 +172,17 @@ class AsyncProgressTrackingMicroBatchExecution(
 watermarkTracker.updateWatermark(lastExecution.executedPlan)
 reportTimeTaken("commitOffsets") {
   // check if current batch there is a async write for the offset log is 
issued for this batch
-  // if so, we should do the same for commit log
-  if (offsetLog.getAsyncOffsetWrite(currentBatchId).nonEmpty) {
+  // if so, we should do the same for commit log.  However, if this is the 
first batch executed
+  // in this run we should always persis to the commit log.  There can be 
situations in which

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052963344


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -1762,4 +1344,173 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 }
   }
+
+  test("test gaps in offset log") {
+val inputData = MemoryStream[Int]
+val streamEvent = inputData.toDF().select("value")
+
+val resourceUri = this.getClass.getResource(
+  
"/structured-streaming/checkpoint-test-offsetId-commitId-inconsistent/").toURI
+val checkpointDir = Utils.createTempDir().getCanonicalFile
+// Copy the checkpoint to a temp dir to prevent changes to the original.
+// Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+// Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+testStream(streamEvent, extraOptions = Map(
+  ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
+  ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS -> "0"
+))(
+  AddData(inputData, 0),
+  AddData(inputData, 1),
+  AddData(inputData, 2),
+  AddData(inputData, 3),
+  AddData(inputData, 4),
+  StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+  CheckAnswer(3, 4)
+)
+
+  }
+
+  test("test multiple gaps in offset and commit logs") {
+val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+val ds = inputData.toDS()
+
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+// create a scenario in which the offset log only
+// contains batch 0, 2, 5 and commit log only contain 0, 2
+testStream(ds, extraOptions = Map(
+  ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
+  ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS -> "0"
+))(
+  StartStream(checkpointLocation = checkpointLocation),
+  AddData(inputData, 0),
+  CheckNewAnswer(0),
+  AddData(inputData, 1),
+  CheckNewAnswer(1),
+  AddData(inputData, 2),
+  CheckNewAnswer(2),
+  AddData(inputData, 3),
+  CheckNewAnswer(3),
+  AddData(inputData, 4),
+  CheckNewAnswer(4),
+  AddData(inputData, 5),
+  CheckNewAnswer(5),
+
+  StopStream
+)
+
+// delete all offset files except for batch 0, 2, 5
+getListOfFiles(checkpointLocation + "/offsets")
+  .filterNot(f => f.getName.startsWith("0")
+|| f.getName.startsWith("2")
+|| f.getName.startsWith("5"))
+  .foreach(_.delete())
+
+// delete all commit log files except for batch 0, 2
+getListOfFiles(checkpointLocation + "/commits")
+  .filterNot(f => f.getName.startsWith("0") || f.getName.startsWith("2"))
+  .foreach(_.delete())
+
+getBatchIdsSortedFromLog(checkpointLocation + "/offsets") should 
equal(Array(0, 2, 5))
+getBatchIdsSortedFromLog(checkpointLocation + "/commits") should 
equal(Array(0, 2))
+
+/**
+ * start new stream
+ */
+val inputData2 = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+val ds2 = inputData2.toDS()
+testStream(ds2, extraOptions = Map(
+  ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
+  ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS -> "0"
+))(
+  // add back old data
+  AddData(inputData2, 0),
+  AddData(inputData2, 1),
+  AddData(inputData2, 2),
+  AddData(inputData2, 3),
+  AddData(inputData2, 4),
+  AddData(inputData2, 5),
+  StartStream(checkpointLocation = checkpointLocation),
+  // since the offset log contains batches 0, 2, 5 and the commit log 
contains
+  // batches 0, 2.  This indicates that batch we have successfully 
processed up to batch 2.
+  // Thus the data we need to process / re-process is batches 3, 4, 5
+  CheckNewAnswer(3, 4, 5),
+  Execute { q =>
+waitPendingOffsetWrites(q)
+eventually(timeout(Span(5, Seconds))) {
+  getBatchIdsSortedFromLog(checkpointLocation + "/offsets") should 
equal(Array(0, 2, 5))
+  getBatchIdsSortedFromLog(checkpointLocation + "/commits") should 
equal(Array(0, 2, 5))
+}
+  },
+  StopStream
+)
+
+getBatchIdsSortedFromLog(checkpointLocation + "/offsets") should 
equal(Array(0, 2, 5))
+getBatchIdsSortedFromLog(checkpointLocation + "/commits") should 
equal(Array(0, 2, 5))
+  }
+
+  test("recovery when gaps exist in offset and commit log") {
+val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+val ds = inputData.toDS()
+
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+// create a scenario in which the offset log only
+// contains batch 0, 

[GitHub] [spark] jerrypeng commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052960918


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -68,11 +64,42 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
 }
   }
 
+  class MemoryStreamCapture[A: Encoder](
+ id: Int,

Review Comment:
   will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on pull request #39131: [SPARK-41162][SQL] Fix anti- and semi-join for self-join with aggregations

2022-12-19 Thread GitBox


mridulm commented on PR #39131:
URL: https://github.com/apache/spark/pull/39131#issuecomment-1358901058

   +CC @shardulm94 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on PR #38517:
URL: https://github.com/apache/spark/pull/38517#issuecomment-1358900740

   Let me give +1 once the builds are passed rather than waiting for addressing 
all minor/nit comments. We can deal with them as a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052937189


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -68,11 +64,42 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
 }
   }
 
+  class MemoryStreamCapture[A: Encoder](
+ id: Int,

Review Comment:
   nit: 4 spaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on PR #38517:
URL: https://github.com/apache/spark/pull/38517#issuecomment-1358899601

   (You can ignore outdated comments since I messed up with only seeing two 
recent commits and maybe left some comments which only bound to old commit.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052941322


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+  

[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052933954


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))
+a.copy(groupingExpressions = groupingExprs)

Review Comment:
   btw we cannot just return the original plan because then it wouldn't work 
for a legitimate case of global aggregates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052931956


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -275,7 +298,7 @@ object AsyncProgressTrackingMicroBatchExecution {
 "_asyncProgressTrackingOverrideSinkSupportCheck"
 
   private def getAsyncProgressTrackingCheckpointingIntervalMs(
-   extraOptions: 
Map[String, String]): Long = {
+extraOptions: Map[String, String]): Long = {

Review Comment:
   nit: 4 spaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052927996


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -78,7 +72,9 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   test("async WAL commits happy path") {
 val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
-val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+//val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)

Review Comment:
   nit: remove comment



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -283,3 +264,23 @@ class AsyncProgressTrackingMicroBatchExecution(
 }
   }
 }
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+   extraOptions: 
Map[String, String]): Long = {

Review Comment:
   nit: indentation is off



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -157,8 +172,17 @@ class AsyncProgressTrackingMicroBatchExecution(
 watermarkTracker.updateWatermark(lastExecution.executedPlan)
 reportTimeTaken("commitOffsets") {
   // check if current batch there is a async write for the offset log is 
issued for this batch
-  // if so, we should do the same for commit log
-  if (offsetLog.getAsyncOffsetWrite(currentBatchId).nonEmpty) {
+  // if so, we should do the same for commit log.  However, if this is the 
first batch executed
+  // in this run we should always persis to the commit log.  There can be 
situations in which

Review Comment:
   nit: persist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] infoankitp commented on pull request #38865: [SPARK-41232][SQL][PYTHON] Adding array_append function

2022-12-19 Thread GitBox


infoankitp commented on PR #38865:
URL: https://github.com/apache/spark/pull/38865#issuecomment-1358869906

   @beliefer @LuciferYang Friendly ping! Please review the changes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052926462


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {

Review Comment:
   this is cool!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052920528


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))

Review Comment:
   Nit pick to make the analyzer rule more efficient. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052920262


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR

Review Comment:
   ```suggestion
   import org.apache.spark.sql.catalyst.trees.TreePattern.{AGGREGATE, 
AGGREGATE_EXPRESSION, UNRESOLVED_STAR}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052919912


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))

Review Comment:
   ```suggestion
 
a.aggregateExpressions.filter(!_.containsPattern(AGGREGATE_EXPRESSION))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052919759


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {

Review Comment:
   ```suggestion
   _.containsAllPatterns(UNRESOLVED_STAR, AGGREGATE), ruleId) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


gengliangwang commented on PR #39104:
URL: https://github.com/apache/spark/pull/39104#issuecomment-1358850152

   Thanks, merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang closed pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


gengliangwang closed pull request #39104: [SPARK-41425][UI] Protobuf serializer 
for RDDStorageInfoWrapper
URL: https://github.com/apache/spark/pull/39104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052911704


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))
+a.copy(groupingExpressions = groupingExprs)

Review Comment:
   race condition. see my comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang closed pull request #39100: [SPARK-41427][UI] Protobuf serializer for ExecutorStageSummaryWrapper

2022-12-19 Thread GitBox


gengliangwang closed pull request #39100: [SPARK-41427][UI] Protobuf serializer 
for ExecutorStageSummaryWrapper
URL: https://github.com/apache/spark/pull/39100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


cloud-fan commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052911386


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))
+a.copy(groupingExpressions = groupingExprs)

Review Comment:
   if `groupingExprs` is empty, shall we return the original plan, then we can 
throw a better error message later for `GROUP BY *` that doesn't match anything.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  

[GitHub] [spark] gengliangwang commented on pull request #39100: [SPARK-41427][UI] Protobuf serializer for ExecutorStageSummaryWrapper

2022-12-19 Thread GitBox


gengliangwang commented on PR #39100:
URL: https://github.com/apache/spark/pull/39100#issuecomment-1358848179

   Merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


cloud-fan commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052911008


##
sql/core/src/test/resources/sql-tests/results/group-by-star-mosha.sql.out:
##
@@ -0,0 +1,141 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+create temporary view stuff as select * from values
+  (42, 9.75, 'hello world', '1970-08-07', '13.37', array(1,20,300)),
+  (1337, 1.2345, 'oh no', '2000-01-01', '42.0', array(4000,5,60)),
+  (42, 13.37, 'test', '1970-08-07', '1234567890', 
array(700,8000,9))
+  as stuff(i, f, s, t, d, a)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT 100 * SUM(i) + SUM(f) / COUNT(s) AS f1, i AS f2 FROM stuff GROUP BY * 
ORDER BY f2
+-- !query schema
+struct
+-- !query output
+8411.56000 42
+133701.23450   1337
+
+
+-- !query
+SELECT i + 1 AS i1, COUNT(i - 2) ci, f / i AS fi, SUM(i + f) sif FROM stuff 
GROUP BY * ORDER BY 1, 3
+-- !query schema
+struct
+-- !query output
+43 1   0.232142857142857   51.7500
+43 1   0.318   55.3700
+1338   1   0.000923335826477   1338.2345
+
+
+-- !query
+SELECT i AS i, COUNT(i) ci, f AS f, SUM(i + f) sif FROM stuff GROUP BY * ORDER 
BY 1, i, 2, ci, 3, f, 4, sif
+-- !query schema
+struct
+-- !query output
+42 1   9.7500  51.7500
+42 1   13.3700 55.3700
+1337   1   1.2345  1338.2345
+
+
+-- !query
+SELECT i + 1, f / i, substring(s, 2, 3), extract(year from t), d / 2, size(a) 
FROM stuff
+GROUP BY * ORDER BY 1, 3, 4, 5, 6, 2
+-- !query schema
+struct<(i + 1):int,(f / i):decimal(17,15),substring(s, 2, 
3):string,extract(year FROM t):int,(d / 2):double,size(a):int>
+-- !query output
+43 0.232142857142857   ell 19706.685   3
+43 0.318   est 19706.17283945E83
+1338   0.000923335826477   h n 200021.03
+
+
+-- !query
+SELECT i + SUM(f) FROM stuff GROUP BY *
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "MISSING_GROUP_BY",

Review Comment:
   The error message is
   ```
 "MISSING_GROUP_BY" : {
   "message" : [
 "The query does not include a GROUP BY clause. Add GROUP BY or turn it 
into the window functions using OVER clauses."
   ]
 },
   ```
   It's a bit misleading as users do specify the `GROUP BY *` clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052910941


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveGroupByStar.scala:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_STAR
+
+/**
+ * Resolve the star in the group by statement in the following SQL pattern:
+ *  `select col1, col2, agg_expr(...) from table group by *`.
+ *
+ * The star is expanded to include all non-aggregate columns in the select 
clause.
+ */
+object ResolveGroupByStar extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
+_.containsPattern(UNRESOLVED_STAR), ruleId) {
+// Match a group by with a single unresolved star
+case a: Aggregate if a.groupingExpressions == UnresolvedStar(None) :: Nil 
=>
+  // Only makes sense to do the rewrite once all the aggregate expressions 
have been resolved.
+  // Otherwise, we might incorrectly pull an actual aggregate expression 
over to the grouping
+  // expression list (because we don't know they would be aggregate 
expressions until resolved).
+  if (a.aggregateExpressions.forall(_.resolved)) {
+val groupingExprs =
+  
a.aggregateExpressions.filter(!_.exists(_.isInstanceOf[AggregateExpression]))
+a.copy(groupingExpressions = groupingExprs)

Review Comment:
   there's a usability issue here: the error message is kind of terrible when 
this rule cannot infer the columns and simply return Nil. the error msg will 
become an empty group by which is confusing. once we figure out the syntax, we 
should throw a better error in checkAnalysis.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin commented on a diff in pull request #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin commented on code in PR #39134:
URL: https://github.com/apache/spark/pull/39134#discussion_r1052909245


##
sql/core/src/test/resources/sql-tests/inputs/group-by-star.sql:
##
@@ -0,0 +1,45 @@
+-- group by all

Review Comment:
   do we need a test case for window functions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rxin opened a new pull request, #39134: [WIP] Implement group by star (aka group by all)

2022-12-19 Thread GitBox


rxin opened a new pull request, #39134:
URL: https://github.com/apache/spark/pull/39134

   ### What changes were proposed in this pull request?
   This patch implements group by star. This is similar to the "group by all" 
implemented in DuckDB. Note that I'm not done yet. We need to decide if the 
appropriate syntax is star, all, or both. We also need to decide if we want to 
throw a nicer error message when we cannot infer the right columns. Right now 
it just says invalid group by expression.
   
   ### Why are the changes needed?
   It's nice convenience syntactic sugar for interactive sql to avoid repeating 
the grouping columns/expressions in group by, when they can be implicitly 
inferred. It actually brings SQL a little bit closer to the DataFrame API in 
terms of usability for aggregations.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. This is a user facing syntactic sugar.
   
   ### How was this patch tested?
   Added test cases homegrown, as well as test cases from DuckDB and Mosha.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] jerrypeng commented on pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


jerrypeng commented on PR #38517:
URL: https://github.com/apache/spark/pull/38517#issuecomment-1358828963

   @HeartSaVioR I have addressed your comments please take another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #38984: [SPARK-41349][CONNECT][PYTHON] Implement DataFrame.hint

2022-12-19 Thread GitBox


zhengruifeng commented on PR #38984:
URL: https://github.com/apache/spark/pull/38984#issuecomment-1358823132

   merged into master, thank you @dengziming for working on it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #38984: [SPARK-41349][CONNECT][PYTHON] Implement DataFrame.hint

2022-12-19 Thread GitBox


zhengruifeng closed pull request #38984: [SPARK-41349][CONNECT][PYTHON] 
Implement DataFrame.hint
URL: https://github.com/apache/spark/pull/38984


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


itholic commented on PR #39128:
URL: https://github.com/apache/spark/pull/39128#issuecomment-1358810969

   Let me close it for now, and re-create the PR to change the logic to re-use 
JVM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic closed pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


itholic closed pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark 
package: `pyspark.errors` and error classes.
URL: https://github.com/apache/spark/pull/39128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #39124: [DON'T MERGE] Test build and test with hadoop 3.3.5-RC0

2022-12-19 Thread GitBox


LuciferYang commented on PR #39124:
URL: https://github.com/apache/spark/pull/39124#issuecomment-1358800857

   also cc @wangyum 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #39124: [DON'T MERGE] Test build and test with hadoop 3.3.5-RC0

2022-12-19 Thread GitBox


LuciferYang commented on PR #39124:
URL: https://github.com/apache/spark/pull/39124#issuecomment-1358794388

   Many test failed as follows:
   
   ```
   2022-12-20T03:15:37.0609530Z [info] 
org.apache.spark.sql.hive.execution.command.AlterTableAddColumnsSuite *** 
ABORTED *** (28 milliseconds)
   2022-12-20T03:15:37.0701184Z [info]   
java.lang.reflect.InvocationTargetException:
   2022-12-20T03:15:37.0701846Z [info]   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   2022-12-20T03:15:37.0702983Z [info]   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
   2022-12-20T03:15:37.0703732Z [info]   at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   2022-12-20T03:15:37.0704398Z [info]   at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
   2022-12-20T03:15:37.0705400Z [info]   at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:315)
   2022-12-20T03:15:37.0706077Z [info]   at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:514)
   2022-12-20T03:15:37.0706751Z [info]   at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:374)
   2022-12-20T03:15:37.0707378Z [info]   at 
org.apache.spark.sql.hive.test.TestHiveExternalCatalog.$anonfun$client$1(TestHive.scala:90)
   2022-12-20T03:15:37.0707917Z [info]   at 
scala.Option.getOrElse(Option.scala:189)
   2022-12-20T03:15:37.0708804Z [info]   at 
org.apache.spark.sql.hive.test.TestHiveExternalCatalog.client$lzycompute(TestHive.scala:90)
   2022-12-20T03:15:37.0709589Z [info]   at 
org.apache.spark.sql.hive.test.TestHiveExternalCatalog.client(TestHive.scala:88)
   2022-12-20T03:15:37.0710320Z [info]   at 
org.apache.spark.sql.hive.test.TestHiveSingleton.$init$(TestHiveSingleton.scala:33)
   2022-12-20T03:15:37.0711253Z [info]   at 
org.apache.spark.sql.hive.execution.command.AlterTableAddColumnsSuite.(AlterTableAddColumnsSuite.scala:27)
   2022-12-20T03:15:37.0712160Z [info]   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   2022-12-20T03:15:37.0712844Z [info]   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
   2022-12-20T03:15:37.0713829Z [info]   at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   2022-12-20T03:15:37.0714480Z [info]   at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
   2022-12-20T03:15:37.0714972Z [info]   at 
java.lang.Class.newInstance(Class.java:442)
   2022-12-20T03:15:37.0715625Z [info]   at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:454)
   2022-12-20T03:15:37.0716141Z [info]   at 
sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:413)
   2022-12-20T03:15:37.0716638Z [info]   at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
   2022-12-20T03:15:37.0717222Z [info]   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   2022-12-20T03:15:37.0718079Z [info]   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   2022-12-20T03:15:37.0718637Z [info]   at 
java.lang.Thread.run(Thread.java:750)
   2022-12-20T03:15:37.0719260Z [info]   Cause: java.lang.RuntimeException: 
Failed to initialize default Hive configuration variables!
   2022-12-20T03:15:37.0719939Z [info]   at 
org.apache.hadoop.hive.conf.HiveConf.getConfVarInputStream(HiveConf.java:3638)
   2022-12-20T03:15:37.0720558Z [info]   at 
org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:4057)
   2022-12-20T03:15:37.0721115Z [info]   at 
org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:4014)
   2022-12-20T03:15:37.0721873Z [info]   at 
org.apache.spark.sql.hive.client.HiveClientImpl$.newHiveConf(HiveClientImpl.scala:1309)
   2022-12-20T03:15:37.0722615Z [info]   at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:176)
   2022-12-20T03:15:37.0723562Z [info]   at 
org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:141)
   2022-12-20T03:15:37.0724265Z [info]   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   2022-12-20T03:15:37.0725154Z [info]   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
   2022-12-20T03:15:37.0815583Z [info]   at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   2022-12-20T03:15:37.0816308Z [info]   at 
java.lang.reflect.Constructor.newInstance(Constructor.java:423)
   2022-12-20T03:15:37.0817005Z [info]   at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:315)
   2022-12-20T03:15:37.0817691Z [info]   at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:514)
   2022-12-20T03:15:37.0818294Z [info]   at 

[GitHub] [spark] amaliujia commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


amaliujia commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052834300


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(
+private val userContext: proto.UserContext,
+private val channel: ManagedChannel)
+  extends AutoCloseable {
+  private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+
+  private[this] val allocator = new RootAllocator()
+
+  /**
+   * The version of Spark on which this application is running.
+   */
+  def version: String = SPARK_VERSION
+
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 3.4.0
+   */
+  @transient
+  lazy val emptyDataFrame: Dataset = newDataset { builder =>
+builder.getLocalRelationBuilder
+  }
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
+   * specified.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset = 
{
+range(start, end, step, Option(numPartitions))
+  }
+
+  private def range(start: Long, end: Long, step: Long, numPartitions: 
Option[Int]): Dataset = {
+newDataset { builder =>
+  val rangeBuilder = builder.getRangeBuilder
+.setStart(start)
+.setEnd(end)
+.setStep(step)
+  numPartitions.foreach(rangeBuilder.setNumPartitions)
+}
+  }
+
+  /**
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @since 2.0.0
+   */
+  def sql(query: String): Dataset = newDataset { builder =>
+builder.setSql(proto.SQL.newBuilder().setQuery(query))
+  }
+
+  private[client] def newDataset(f: proto.Relation.Builder => Unit): Dataset = 
{
+val builder = proto.Relation.newBuilder()
+f(builder)
+val plan = proto.Plan.newBuilder().setRoot(builder).build()
+new Dataset(this, plan)
+  }
+
+  private[client] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+val request = proto.AnalyzePlanRequest.newBuilder()
+  .setPlan(plan)
+  .setUserContext(userContext)
+  .build()
+stub.analyzePlan(request)
+  }
+
+  override def close(): Unit = {
+channel.shutdownNow()
+allocator.close()
+  }
+}
+
+object SparkSession {
+  def builder(): Builder = new Builder()
+
+  class Builder() {

Review Comment:
   > I also don't quite understand the comment about the connection string. 
That is just an option we should support (URI parsing is very much a solved 
problem).
   
   I am not sure if it is an option or a must. Different clients do not share a 
standard way to connect to server does not sound right. 
   
   We may not need it in the first version though. If ultimately we agree that 
all clients should follow the same way for the connection, at least there is a 
JIRA to track or a TODO here to say Scala side will follow the connection 
string is good. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


amaliujia commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052834300


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(
+private val userContext: proto.UserContext,
+private val channel: ManagedChannel)
+  extends AutoCloseable {
+  private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+
+  private[this] val allocator = new RootAllocator()
+
+  /**
+   * The version of Spark on which this application is running.
+   */
+  def version: String = SPARK_VERSION
+
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 3.4.0
+   */
+  @transient
+  lazy val emptyDataFrame: Dataset = newDataset { builder =>
+builder.getLocalRelationBuilder
+  }
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
+   * specified.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset = 
{
+range(start, end, step, Option(numPartitions))
+  }
+
+  private def range(start: Long, end: Long, step: Long, numPartitions: 
Option[Int]): Dataset = {
+newDataset { builder =>
+  val rangeBuilder = builder.getRangeBuilder
+.setStart(start)
+.setEnd(end)
+.setStep(step)
+  numPartitions.foreach(rangeBuilder.setNumPartitions)
+}
+  }
+
+  /**
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @since 2.0.0
+   */
+  def sql(query: String): Dataset = newDataset { builder =>
+builder.setSql(proto.SQL.newBuilder().setQuery(query))
+  }
+
+  private[client] def newDataset(f: proto.Relation.Builder => Unit): Dataset = 
{
+val builder = proto.Relation.newBuilder()
+f(builder)
+val plan = proto.Plan.newBuilder().setRoot(builder).build()
+new Dataset(this, plan)
+  }
+
+  private[client] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+val request = proto.AnalyzePlanRequest.newBuilder()
+  .setPlan(plan)
+  .setUserContext(userContext)
+  .build()
+stub.analyzePlan(request)
+  }
+
+  override def close(): Unit = {
+channel.shutdownNow()
+allocator.close()
+  }
+}
+
+object SparkSession {
+  def builder(): Builder = new Builder()
+
+  class Builder() {

Review Comment:
   > I also don't quite understand the comment about the connection string. 
That is just an option we should support (URI parsing is very much a solved 
problem).
   
   I am not sure if it is an option or a must. Different clients do not share a 
standard way to connect to server does not sound right. 
   
   We may not need it in the first version though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] for08 commented on pull request #39111: [MINOR] Fix some typos

2022-12-19 Thread GitBox


for08 commented on PR #39111:
URL: https://github.com/apache/spark/pull/39111#issuecomment-1358783259

   I reopen this PR according to the suggestions of srowen and bjornjorgensen. 
As a beginner, I will continue to learn and use, not just fixing more typos.
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   At 2022-12-19 09:13:59, "Hyukjin Kwon" ***@***.***> wrote:
   
   @HyukjinKwon commented on this pull request.
   
   That;s fine but mind fixing more typos since we're here? Also please keep 
the PR template 
https://github.com/apache/spark/blob/master/.github/PULL_REQUEST_TEMPLATE
   
   —
   Reply to this email directly, view it on GitHub, or unsubscribe.
   You are receiving this because you authored the thread.Message ID: 
***@***.***>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] for08 commented on pull request #39111: [MINOR] Fix some typos

2022-12-19 Thread GitBox


for08 commented on PR #39111:
URL: https://github.com/apache/spark/pull/39111#issuecomment-1358782929

   I reopen this PR according to the suggestions of srowen and bjornjorgensen.
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   At 2022-12-19 11:30:19, "UCB AMPLab" ***@***.***> wrote:
   
   Can one of the admins verify this patch?
   
   —
   Reply to this email directly, view it on GitHub, or unsubscribe.
   You are receiving this because you authored the thread.Message ID: 
***@***.***>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #39084: [SPARK-41464][CONNECT][PYTHON] Implement `DataFrame.to`

2022-12-19 Thread GitBox


beliefer commented on code in PR #39084:
URL: https://github.com/apache/spark/pull/39084#discussion_r1052820825


##
python/pyspark/sql/tests/connect/test_connect_basic.py:
##
@@ -389,6 +389,21 @@ def test_schema(self):
 self.connect.sql(query).schema.__repr__(),
 )
 
+def test_to(self):
+# SPARK-41464: test DataFrame.to()

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] rangadi commented on pull request #38922: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

2022-12-19 Thread GitBox


rangadi commented on PR #38922:
URL: https://github.com/apache/spark/pull/38922#issuecomment-1358763884

   Asking @HeartSaVioR to take a quick look to approve. 
   @cloud-fan take a look at the updated PR description for example of how 
spark schema would look like with the different setting for the config. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #39041: [SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API

2022-12-19 Thread GitBox


HyukjinKwon commented on PR #39041:
URL: https://github.com/apache/spark/pull/39041#issuecomment-1358746938

   Build: https://github.com/HyukjinKwon/spark/runs/10178697287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #39125: [SPARK-41584][BUILD] Upgrade RoaringBitmap to 0.9.36

2022-12-19 Thread GitBox


LuciferYang commented on code in PR #39125:
URL: https://github.com/apache/spark/pull/39125#discussion_r1052806208


##
core/benchmarks/MapStatusesConvertBenchmark-results.txt:
##
@@ -2,12 +2,12 @@
 MapStatuses Convert Benchmark
 

 
-OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure
-Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 1.8.0_352-b08 on Linux 5.15.0-1023-azure
+Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
 MapStatuses Convert:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Num Maps: 5 Fetch partitions:500   1099   1127 
 27  0.0  1099192398.0   1.0X
-Num Maps: 5 Fetch partitions:1000  1981   1999 
 16  0.0  1981390271.0   0.6X
-Num Maps: 5 Fetch partitions:1500  2973   3011 
 39  0.0  2973029597.0   0.4X
+Num Maps: 5 Fetch partitions:500   1570   1599 
 25  0.0  1569784446.0   1.0X

Review Comment:
   Let me run this result again. There is a performance gap between `E5-2673 
v3` and `Platinum 8272CL`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


hvanhovell commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052791191


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(

Review Comment:
   Why? What do you want to test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


hvanhovell commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052791191


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(

Review Comment:
   Why? What else do you want to test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


hvanhovell commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052790753


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(
+private val userContext: proto.UserContext,
+private val channel: ManagedChannel)
+  extends AutoCloseable {
+  private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+
+  private[this] val allocator = new RootAllocator()
+
+  /**
+   * The version of Spark on which this application is running.
+   */
+  def version: String = SPARK_VERSION
+
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 3.4.0
+   */
+  @transient
+  lazy val emptyDataFrame: Dataset = newDataset { builder =>
+builder.getLocalRelationBuilder
+  }
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
+   * specified.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset = 
{
+range(start, end, step, Option(numPartitions))
+  }
+
+  private def range(start: Long, end: Long, step: Long, numPartitions: 
Option[Int]): Dataset = {
+newDataset { builder =>
+  val rangeBuilder = builder.getRangeBuilder
+.setStart(start)
+.setEnd(end)
+.setStep(step)
+  numPartitions.foreach(rangeBuilder.setNumPartitions)
+}
+  }
+
+  /**
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @since 2.0.0
+   */
+  def sql(query: String): Dataset = newDataset { builder =>
+builder.setSql(proto.SQL.newBuilder().setQuery(query))
+  }
+
+  private[client] def newDataset(f: proto.Relation.Builder => Unit): Dataset = 
{
+val builder = proto.Relation.newBuilder()
+f(builder)
+val plan = proto.Plan.newBuilder().setRoot(builder).build()
+new Dataset(this, plan)
+  }
+
+  private[client] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+val request = proto.AnalyzePlanRequest.newBuilder()
+  .setPlan(plan)
+  .setUserContext(userContext)
+  .build()
+stub.analyzePlan(request)
+  }
+
+  override def close(): Unit = {
+channel.shutdownNow()
+allocator.close()
+  }
+}
+
+object SparkSession {
+  def builder(): Builder = new Builder()
+
+  class Builder() {

Review Comment:
   We use a channel builder in the build method?
   
   I also don't quite understand the comment about the connection string. That 
is just an option we should support (URI parsing is very much a solved problem).
   
   Finally, sure we can make this PR smaller. It is early days for the Scala 
Client and we need to start somewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db opened a new pull request, #39133: [SPARK-41595][SQL] Support generator function explode/explode_outer in the FROM clause

2022-12-19 Thread GitBox


allisonwang-db opened a new pull request, #39133:
URL: https://github.com/apache/spark/pull/39133

   
   
   ### What changes were proposed in this pull request?
   This PR supports using table-valued generator functions in the FROM clause 
of a query. A generator function can be registered in the table function 
registry and resolved as a table function during analysis.
   
   Note this PR only adds support for two built-in generator functions: 
`explode` and `explode_outer` with literal input values. We will support more 
generator functions and LATERAL references in separate PRs.
   
   ### Why are the changes needed?
   To make table-valued generator functions more user-friendly and consistent 
with Spark's built-in table function Range.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. Before this PR, the built-in generator function explode/explode_outer 
cannot be used in the FROM clause:
   ```
   select * from explode(array(1, 2))
   
   AnalysisException: could not resolve `explode` to a table-valued function;
   ```
   After this PR, we can support this usage:
   ```
   select * from explode(array(1, 2))
   
   +---+
   |col|
   +---+
   |  1|
   |  2|
   +---+
   ```
   
   
   ### How was this patch tested?
   
   New SQL query tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR closed pull request #39132: [MINOR][DOC] Fix for Kafka Consumer Config Link

2022-12-19 Thread GitBox


HeartSaVioR closed pull request #39132: [MINOR][DOC] Fix for Kafka Consumer 
Config Link
URL: https://github.com/apache/spark/pull/39132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #39132: [MINOR][DOC] Fix for Kafka Consumer Config Link

2022-12-19 Thread GitBox


HeartSaVioR commented on PR #39132:
URL: https://github.com/apache/spark/pull/39132#issuecomment-1358709546

   Thanks! Merging to master. (It's just a small doc change so won't wait for 
CI build.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-19 Thread GitBox


zhengruifeng commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1052762202


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -534,6 +536,36 @@ class SparkConnectPlanner(session: SparkSession) {
 }
   }
 
+  /**
+   * Translates a LambdaFunction from proto to the Catalyst expression.
+   */
+  private def transformLamdaFunction(lambda: proto.Expression.LambdaFunction): 
Expression = {
+if (lambda.getArgumentsCount == 0) {

Review Comment:
   ok, let me add it in server side



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL opened a new pull request, #39132: [MINOR][DOC] Fix for Kafka Consumer Config Link

2022-12-19 Thread GitBox


WweiL opened a new pull request, #39132:
URL: https://github.com/apache/spark/pull/39132

   
   
   ### What changes were proposed in this pull request?
   
   Right the redirect link for kafka consumer config, before it points you to 
the top of the page, now it redirects you to the correct section.
   
   ### Why are the changes needed?
   
   Doc readability.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   No test needed. Doc change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #39117: [SPARK-41535][SQL] Set null correctly for calendar interval fields in `InterpretedUnsafeProjection` and `InterpretedMutableProjection`

2022-12-19 Thread GitBox


HyukjinKwon closed pull request #39117: [SPARK-41535][SQL] Set null correctly 
for calendar interval fields in `InterpretedUnsafeProjection` and 
`InterpretedMutableProjection`
URL: https://github.com/apache/spark/pull/39117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #39117: [SPARK-41535][SQL] Set null correctly for calendar interval fields in `InterpretedUnsafeProjection` and `InterpretedMutableProjection`

2022-12-19 Thread GitBox


HyukjinKwon commented on PR #39117:
URL: https://github.com/apache/spark/pull/39117#issuecomment-1358675707

   Merged to master, branch-3.3, and branch-3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


HyukjinKwon commented on code in PR #39128:
URL: https://github.com/apache/spark/pull/39128#discussion_r1052752065


##
python/pyspark/sql/functions.py:
##
@@ -8122,15 +8130,13 @@ def _get_lambda_parameters(f: Callable) -> 
ValuesView[inspect.Parameter]:
 # Validate that
 # function arity is between 1 and 3
 if not (1 <= len(parameters) <= 3):
-raise ValueError(
-"f should take between 1 and 3 arguments, but provided function 
takes {}".format(
-len(parameters)
-)
+raise invalidHigherOrderFunctionArgumentNumberError(
+func_name=f.__name__, num_args=len(parameters)
 )
 
 # and all arguments can be used as positional
 if not all(p.kind in supported_parameter_types for p in parameters):
-raise ValueError("f should use only POSITIONAL or POSITIONAL OR 
KEYWORD arguments")
+raise 
invalidParameterTypeForHigherOrderFunctionError(func_name=f.__name__)

Review Comment:
   If you plan to do this all in other places, please file an umbrella JIRA



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


HyukjinKwon commented on code in PR #39128:
URL: https://github.com/apache/spark/pull/39128#discussion_r1052751875


##
python/pyspark/testing/utils.py:
##
@@ -138,6 +140,32 @@ def setUpClass(cls):
 def tearDownClass(cls):
 cls.sc.stop()
 
+def checkError(

Review Comment:
   `check_error`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


HyukjinKwon commented on code in PR #39128:
URL: https://github.com/apache/spark/pull/39128#discussion_r1052751697


##
python/pyspark/errors/error_classes.py:
##
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+ERROR_CLASSES = {

Review Comment:
   These files should be in JSON, and provide a user to avoid this before 
PySpark starting up as we have done in Scala side.
   
   Do you propose a new way to override them in PySpark side by overwriting 
this in library?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


HyukjinKwon commented on code in PR #39128:
URL: https://github.com/apache/spark/pull/39128#discussion_r1052751097


##
python/pyspark/errors/__init__.py:
##
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Dict, Optional, Union, Any, Type
+from pyspark.errors.error_classes import ERROR_CLASSES
+
+
+class PySparkException(Exception):

Review Comment:
   Let's move all these into a separate file under `pyspark/errors` (e.g., 
`pyspark/errors/exceptions.py`), and only put imports here in `__init__.py`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #39128: [SPARK-41586][PYTHON] Introduce new PySpark package: `pyspark.errors` and error classes.

2022-12-19 Thread GitBox


HyukjinKwon commented on code in PR #39128:
URL: https://github.com/apache/spark/pull/39128#discussion_r1052750600


##
python/pyspark/errors/__init__.py:
##
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Dict, Optional, Union, Any, Type
+from pyspark.errors.error_classes import ERROR_CLASSES
+
+
+class PySparkException(Exception):

Review Comment:
   Should we integrate this to the exceptions defined under `pyspark.sql.utils`?



##
python/pyspark/errors/__init__.py:
##
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Dict, Optional, Union, Any, Type
+from pyspark.errors.error_classes import ERROR_CLASSES
+
+
+class PySparkException(Exception):
+"""
+Base Exception for handling the errors generated by PySpark
+"""
+
+def __init__(self, error_class: str, message_parameters: 
Optional[Dict[str, str]] = None):
+self._verify_error_class(error_class)
+self._error_class = error_class
+
+self._error_message_format = ERROR_CLASSES[error_class]
+
+self._verify_message_parameters(message_parameters)
+self._message_parameters = message_parameters
+
+def _verify_error_class(self, error_class: str) -> None:
+assert (
+error_class in ERROR_CLASSES
+), f"{error_class} is not in the list of error classes: 
{list(ERROR_CLASSES.keys())}"
+
+def _verify_message_parameters(
+self, message_parameters: Optional[Dict[str, str]] = None
+) -> None:
+required = set(self._error_message_format.__code__.co_varnames)
+given = set() if message_parameters is None else 
set(message_parameters.keys())
+assert given == required, f"Given message parameters: {given} , but 
{required} required"
+
+def getErrorClass(self) -> str:
+return self._error_class
+
+def getMessageParameters(self) -> Optional[Dict[str, str]]:
+return self._message_parameters
+
+def getErrorMessage(self) -> str:
+if self._message_parameters is None:
+message = self._error_message_format()  # type: ignore[operator]
+else:
+message = self._error_message_format(
+*self._message_parameters.values()
+)  # type: ignore[operator]
+
+return message
+
+def __str__(self) -> str:
+# The user-facing error message is contains error class and error 
message
+# e.g. "[WRONG_NUM_COLUMNS] 'greatest' should take at least two 
columns"
+return f"[{self.getErrorClass()}] {self.getErrorMessage()}"
+
+
+def notColumnOrStringError(arg_name: str, arg_type: Type[Any]) -> 
"PySparkException":
+return PySparkException(
+error_class="NOT_COLUMN_OR_STRING",
+message_parameters={"arg_name": arg_name, "arg_type": 
arg_type.__name__},
+)
+
+
+def notColumnOrIntegerError(arg_name: str, arg_type: Type[Any]) -> 
"PySparkException":

Review Comment:
   Can we follow snake naming rule since these are all internals?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: 

[GitHub] [spark] HyukjinKwon commented on pull request #39129: [SPARK-41587][BUILD] Upgrade `org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7`

2022-12-19 Thread GitBox


HyukjinKwon commented on PR #39129:
URL: https://github.com/apache/spark/pull/39129#issuecomment-1358669841

   cc @sarutak FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #37831: [SPARK-40354][SQL] Support eliminate dynamic partition for datasource v1 writes

2022-12-19 Thread GitBox


github-actions[bot] closed pull request #37831: [SPARK-40354][SQL] Support 
eliminate dynamic partition for datasource v1 writes
URL: https://github.com/apache/spark/pull/37831


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #39041: [SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API

2022-12-19 Thread GitBox


HyukjinKwon commented on PR #39041:
URL: https://github.com/apache/spark/pull/39041#issuecomment-1358666500

   Let me get this in in few days if there are no more comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srielau commented on a diff in pull request #38861: [SPARK-41294][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1203 / 1168

2022-12-19 Thread GitBox


srielau commented on code in PR #38861:
URL: https://github.com/apache/spark/pull/38861#discussion_r1052745779


##
sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out:
##
@@ -3831,12 +3831,12 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1168",
+  "errorClass" : "NUM_COLUMNS_MISMATCH",
   "messageParameters" : {
-"insertedColumns" : "5",
-"staticPartCols" : "0",
-"tableName" : "`spark_catalog`.`default`.`num_result`",
-"targetColumns" : "3"
+"firstNumColumns" : "3",
+"invalidNumColumns" : "5",
+"invalidOrdinalNum" : "second",

Review Comment:
   Not really Ok with that. I'm also confused by the "second" portion. That 
doesn't translate and seems unnecessary compared to "1." and "2.".
   Should we have a dedicated error message? Especially INSERT INTO has an 
optional column list, so there are many ways to "fix" the error. It's not the 
same as a UNION ALL or an EXCEPT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #39123: [SPARK-41583][CONNECT][PROTOBUF] Add Spark Connect and protobuf into setup.py with specifying dependencies

2022-12-19 Thread GitBox


HyukjinKwon closed pull request #39123: [SPARK-41583][CONNECT][PROTOBUF] Add 
Spark Connect and protobuf into setup.py with specifying dependencies
URL: https://github.com/apache/spark/pull/39123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #39123: [SPARK-41583][CONNECT][PROTOBUF] Add Spark Connect and protobuf into setup.py with specifying dependencies

2022-12-19 Thread GitBox


HyukjinKwon commented on PR #39123:
URL: https://github.com/apache/spark/pull/39123#issuecomment-1358655672

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #39123: [SPARK-41583][CONNECT][PROTOBUF] Add Spark Connect and protobuf into setup.py with specifying dependencies

2022-12-19 Thread GitBox


HyukjinKwon commented on code in PR #39123:
URL: https://github.com/apache/spark/pull/39123#discussion_r1052742314


##
python/setup.py:
##
@@ -113,6 +113,7 @@ def _supports_symlinks():
 # Also don't forget to update python/docs/source/getting_started/install.rst.
 _minimum_pandas_version = "1.0.5"
 _minimum_pyarrow_version = "1.0.0"
+_minimum_pandas_version = "1.48.1"

Review Comment:
   ```suggestion
   _minimum_grpc_version = "1.48.1"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39104:
URL: https://github.com/apache/spark/pull/39104#discussion_r1052742075


##
core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala:
##
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.status.RDDStorageInfoWrapper
+import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo, 
RDDStorageInfo}
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+object RDDStorageInfoWrapperSerializer {
+  def serialize(input: RDDStorageInfoWrapper): Array[Byte] = {
+val builder = StoreTypes.RDDStorageInfoWrapper.newBuilder()
+builder.setInfo(serializeRDDStorageInfo(input.info))
+builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDStorageInfoWrapper = {
+val wrapper = StoreTypes.RDDStorageInfoWrapper.parseFrom(bytes)
+new RDDStorageInfoWrapper(
+  info = deserializeRDDStorageInfo(wrapper.getInfo)
+)
+  }
+
+  private def serializeRDDStorageInfo(info: RDDStorageInfo): 
StoreTypes.RDDStorageInfo = {
+val builder = StoreTypes.RDDStorageInfo.newBuilder()
+builder.setId(info.id)
+builder.setName(info.name)
+builder.setNumPartitions(info.numPartitions)
+builder.setNumCachedPartitions(info.numCachedPartitions)
+builder.setStorageLevel(info.storageLevel)
+builder.setMemoryUsed(info.memoryUsed)
+builder.setDiskUsed(info.diskUsed)
+
+if (info.dataDistribution.isDefined) {
+  info.dataDistribution.get.foreach { dd =>
+val dataDistributionBuilder = 
StoreTypes.RDDDataDistribution.newBuilder()
+dataDistributionBuilder.setAddress(dd.address)
+dataDistributionBuilder.setMemoryUsed(dd.memoryUsed)
+dataDistributionBuilder.setMemoryRemaining(dd.memoryRemaining)
+dataDistributionBuilder.setDiskUsed(dd.diskUsed)
+
dd.onHeapMemoryUsed.foreach(dataDistributionBuilder.setOnHeapMemoryUsed)
+
dd.offHeapMemoryUsed.foreach(dataDistributionBuilder.setOffHeapMemoryUsed)
+
dd.onHeapMemoryRemaining.foreach(dataDistributionBuilder.setOnHeapMemoryRemaining)
+
dd.offHeapMemoryRemaining.foreach(dataDistributionBuilder.setOffHeapMemoryRemaining)
+builder.addDataDistribution(dataDistributionBuilder.build())
+  }
+}
+
+if (info.partitions.isDefined) {
+  info.partitions.get.foreach { p =>
+val partitionsBuilder = StoreTypes.RDDPartitionInfo.newBuilder()
+partitionsBuilder.setBlockName(p.blockName)
+partitionsBuilder.setStorageLevel(p.storageLevel)
+partitionsBuilder.setMemoryUsed(p.memoryUsed)
+partitionsBuilder.setDiskUsed(p.diskUsed)
+p.executors.foreach(partitionsBuilder.addExecutors)
+builder.addPartitions(partitionsBuilder.build())
+  }
+}
+
+builder.build()
+  }
+
+  private def deserializeRDDStorageInfo(info: StoreTypes.RDDStorageInfo): 
RDDStorageInfo = {
+new RDDStorageInfo(
+  id = info.getId,
+  name = info.getName,
+  numPartitions = info.getNumPartitions,
+  numCachedPartitions = info.getNumCachedPartitions,
+  storageLevel = info.getStorageLevel,
+  memoryUsed = info.getMemoryUsed,
+  diskUsed = info.getDiskUsed,
+  dataDistribution =
+if (info.getDataDistributionList.isEmpty) {

Review Comment:
   Yeah the optional sequence here is tricky...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

2022-12-19 Thread GitBox


anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052738997


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##
@@ -244,7 +303,67 @@ object ResolveLateralColumnAliasReference extends 
Rule[LogicalPlan] {
   child = Project(innerProjectList.toSeq, child)
 )
   }
+
+case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if 
agg.resolved
+&& 
aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) 
=>
+
+  val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+  val expressionMap = 
collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+  val projectExprs = aggregateExpressions.map { exp =>
+exp.transformDown {
+  case aggExpr: AggregateExpression =>
+// Doesn't support referencing a lateral alias in aggregate 
function
+if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+  aggExpr.collectFirst {
+case lcaRef: LateralColumnAliasReference =>
+  throw 
QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+lcaRef.nameParts, aggExpr)
+  }
+}
+val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, 
assignAlias(aggExpr))
+newAggExprs += ne
+ne.toAttribute
+  case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+val ne = expressionMap.getOrElseUpdate(e.canonicalized, 
assignAlias(e))
+newAggExprs += ne
+ne.toAttribute
+}.asInstanceOf[NamedExpression]
+  }
+  if (newAggExprs.isEmpty) {
+agg
+  } else {
+// perform an early check on current Aggregate before any lift-up 
/ push-down to throw
+// the same exception such as non-aggregate expressions not in 
group by, which becomes
+// missing input after transformation
+earlyCheckAggregate(agg)

Review Comment:
   Yes, actually simplified from `checkAnalysis`.  If this solution works and 
all tests pass, I can move this method to Aggregate or AnalysisHelper object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052735930


##
sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala:
##
@@ -689,4 +713,38 @@ class LateralColumnAliasSuite extends 
LateralColumnAliasSuiteBase {
 s"after extracted from Alias. Current aggregateExpressions: 
$aggregateExpressions")
 }
   }
+
+  test("Non-aggregating expressions not in group by still throws the same 
error") {
+// query without lateral alias
+assert(

Review Comment:
   use `checkError`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052735150


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##
@@ -244,7 +303,67 @@ object ResolveLateralColumnAliasReference extends 
Rule[LogicalPlan] {
   child = Project(innerProjectList.toSeq, child)
 )
   }
+
+case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if 
agg.resolved
+&& 
aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) 
=>
+
+  val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+  val expressionMap = 
collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+  val projectExprs = aggregateExpressions.map { exp =>
+exp.transformDown {
+  case aggExpr: AggregateExpression =>
+// Doesn't support referencing a lateral alias in aggregate 
function
+if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+  aggExpr.collectFirst {
+case lcaRef: LateralColumnAliasReference =>
+  throw 
QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+lcaRef.nameParts, aggExpr)
+  }
+}
+val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, 
assignAlias(aggExpr))
+newAggExprs += ne
+ne.toAttribute
+  case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+val ne = expressionMap.getOrElseUpdate(e.canonicalized, 
assignAlias(e))
+newAggExprs += ne
+ne.toAttribute
+}.asInstanceOf[NamedExpression]
+  }
+  if (newAggExprs.isEmpty) {
+agg
+  } else {
+// perform an early check on current Aggregate before any lift-up 
/ push-down to throw
+// the same exception such as non-aggregate expressions not in 
group by, which becomes
+// missing input after transformation
+earlyCheckAggregate(agg)

Review Comment:
   This seems from checkAnalysis. Could you refactor and reduce duplicated code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052728943


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala:
##
@@ -0,0 +1,1865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{File, OutputStream}
+import java.util.concurrent.{CountDownLatch, Semaphore, ThreadPoolExecutor, 
TimeUnit}
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfter
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.{Seconds, Span}
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.connector.read.streaming
+import 
org.apache.spark.sql.execution.streaming.AsyncProgressTrackingMicroBatchExecution.{ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
 ASYNC_PROGRESS_TRACKING_ENABLED, 
ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK}
+import org.apache.spark.sql.functions.{column, window}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryException, StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.util.{Clock, Utils}
+
+class AsyncProgressTrackingMicroBatchExecutionSuite
+extends StreamTest
+with BeforeAndAfter
+with Matchers {
+
+  import testImplicits._
+
+  after {
+sqlContext.streams.active.foreach(_.stop())
+  }
+
+  def getListOfFiles(dir: String): List[File] = {
+val d = new File(dir)
+if (d.exists && d.isDirectory) {
+  d.listFiles.filter(_.isFile).toList
+} else {
+  List[File]()
+}
+  }
+
+  def waitPendingOffsetWrites(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.areWritesPendingOrInProgress() should be(false)
+}
+  }
+
+  def waitPendingPurges(streamExecution: StreamExecution): Unit = {
+
assert(streamExecution.isInstanceOf[AsyncProgressTrackingMicroBatchExecution])
+eventually(timeout(Span(5, Seconds))) {
+  streamExecution
+.asInstanceOf[AsyncProgressTrackingMicroBatchExecution]
+.arePendingAsyncPurge should be(false)
+}
+  }
+
+  // test the basic functionality i.e. happy path
+  test("async WAL commits happy path") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+val tableName = "test"
+
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.format("memory")
+.queryName(tableName)
+.option(ASYNC_PROGRESS_TRACKING_ENABLED, true)
+.option(ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS, 0)
+.option("checkpointLocation", checkpointLocation)
+.start()
+}
+val query = startQuery()
+val expected = new ListBuffer[Row]()
+for (j <- 0 until 100) {
+  for (i <- 0 until 10) {
+val v = i + (j * 10)
+inputData.addData({ v })
+expected += Row(v)
+  }
+  query.processAllAvailable()
+}
+
+checkAnswer(
+  spark.table(tableName),
+  expected.toSeq
+)
+  }
+
+  test("async WAL commits recovery") {
+val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+
+val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+val ds = inputData.toDF()
+
+var index = 0
+// to synchronize producing and consuming messages so that
+// we can generate and read the desired number of batches
+var countDownLatch = new CountDownLatch(10)
+val sem = new Semaphore(1)
+val data = new ListBuffer[Int]()
+def startQuery(): StreamingQuery = {
+  ds.writeStream
+.foreachBatch((ds: Dataset[Row], batchId: Long) => {
+  

[GitHub] [spark] amaliujia commented on a diff in pull request #39068: [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation

2022-12-19 Thread GitBox


amaliujia commented on code in PR #39068:
URL: https://github.com/apache/spark/pull/39068#discussion_r1052727913


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -534,6 +536,36 @@ class SparkConnectPlanner(session: SparkSession) {
 }
   }
 
+  /**
+   * Translates a LambdaFunction from proto to the Catalyst expression.
+   */
+  private def transformLamdaFunction(lambda: proto.Expression.LambdaFunction): 
Expression = {
+if (lambda.getArgumentsCount == 0) {

Review Comment:
   There is a validation in the client side already but I guess we need that in 
the server side as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


amaliujia commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052725981


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(
+private val userContext: proto.UserContext,
+private val channel: ManagedChannel)
+  extends AutoCloseable {
+  private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+
+  private[this] val allocator = new RootAllocator()
+
+  /**
+   * The version of Spark on which this application is running.
+   */
+  def version: String = SPARK_VERSION
+
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 3.4.0
+   */
+  @transient
+  lazy val emptyDataFrame: Dataset = newDataset { builder =>
+builder.getLocalRelationBuilder
+  }
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
+   * specified.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset = 
{
+range(start, end, step, Option(numPartitions))
+  }
+
+  private def range(start: Long, end: Long, step: Long, numPartitions: 
Option[Int]): Dataset = {
+newDataset { builder =>
+  val rangeBuilder = builder.getRangeBuilder
+.setStart(start)
+.setEnd(end)
+.setStep(step)
+  numPartitions.foreach(rangeBuilder.setNumPartitions)
+}
+  }
+
+  /**
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @since 2.0.0
+   */
+  def sql(query: String): Dataset = newDataset { builder =>
+builder.setSql(proto.SQL.newBuilder().setQuery(query))
+  }
+
+  private[client] def newDataset(f: proto.Relation.Builder => Unit): Dataset = 
{
+val builder = proto.Relation.newBuilder()
+f(builder)
+val plan = proto.Plan.newBuilder().setRoot(builder).build()
+new Dataset(this, plan)
+  }
+
+  private[client] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+val request = proto.AnalyzePlanRequest.newBuilder()
+  .setPlan(plan)
+  .setUserContext(userContext)
+  .build()
+stub.analyzePlan(request)
+  }
+
+  override def close(): Unit = {
+channel.shutdownNow()
+allocator.close()
+  }
+}
+
+object SparkSession {
+  def builder(): Builder = new Builder()
+
+  class Builder() {

Review Comment:
   yeah support ChannelBuilder probably means that you should support 
https://github.com/apache/spark/blob/master/connector/connect/docs/client-connection-string.md



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #39078: [SPARK-41534][CONNECT][SQL] Setup initial client module for Spark Connect

2022-12-19 Thread GitBox


amaliujia commented on code in PR #39078:
URL: https://github.com/apache/spark/pull/39078#discussion_r1052725694


##
connector/connect/client/src/main/scala/org/apache/spark/sql/connect/client/SparkSession.scala:
##
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client
+
+import scala.language.existentials
+
+import io.grpc.{ManagedChannel, ManagedChannelBuilder}
+import org.apache.arrow.memory.RootAllocator
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.connect.proto
+
+
+class SparkSession(
+private val userContext: proto.UserContext,
+private val channel: ManagedChannel)
+  extends AutoCloseable {
+  private[this] val stub = 
proto.SparkConnectServiceGrpc.newBlockingStub(channel)
+
+  private[this] val allocator = new RootAllocator()
+
+  /**
+   * The version of Spark on which this application is running.
+   */
+  def version: String = SPARK_VERSION
+
+  /**
+   * Returns a `DataFrame` with no rows or columns.
+   *
+   * @since 3.4.0
+   */
+  @transient
+  lazy val emptyDataFrame: Dataset = newDataset { builder =>
+builder.getLocalRelationBuilder
+  }
+
+  /**
+   * Creates a [[Dataset]] with a single `LongType` column named `id`, 
containing elements
+   * in a range from `start` to `end` (exclusive) with a step value, with 
partition number
+   * specified.
+   *
+   * @since 2.0.0
+   */
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset = 
{
+range(start, end, step, Option(numPartitions))
+  }
+
+  private def range(start: Long, end: Long, step: Long, numPartitions: 
Option[Int]): Dataset = {
+newDataset { builder =>
+  val rangeBuilder = builder.getRangeBuilder
+.setStart(start)
+.setEnd(end)
+.setStep(step)
+  numPartitions.foreach(rangeBuilder.setNumPartitions)
+}
+  }
+
+  /**
+   * Executes a SQL query using Spark, returning the result as a `DataFrame`.
+   * This API eagerly runs DDL/DML commands, but not for SELECT queries.
+   *
+   * @since 2.0.0
+   */
+  def sql(query: String): Dataset = newDataset { builder =>
+builder.setSql(proto.SQL.newBuilder().setQuery(query))
+  }
+
+  private[client] def newDataset(f: proto.Relation.Builder => Unit): Dataset = 
{
+val builder = proto.Relation.newBuilder()
+f(builder)
+val plan = proto.Plan.newBuilder().setRoot(builder).build()
+new Dataset(this, plan)
+  }
+
+  private[client] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+val request = proto.AnalyzePlanRequest.newBuilder()
+  .setPlan(plan)
+  .setUserContext(userContext)
+  .build()
+stub.analyzePlan(request)
+  }
+
+  override def close(): Unit = {
+channel.shutdownNow()
+allocator.close()
+  }
+}
+
+object SparkSession {
+  def builder(): Builder = new Builder()
+
+  class Builder() {
+private val userContextBuilder = proto.UserContext.newBuilder()
+private var _host: String = "localhost"
+private var _port: Int = 15002

Review Comment:
   +1 for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] techaddict commented on a diff in pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


techaddict commented on code in PR #39104:
URL: https://github.com/apache/spark/pull/39104#discussion_r1052723531


##
core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala:
##
@@ -17,7 +17,7 @@
 
 package org.apache.spark.status.protobuf
 
-import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, TaskDataWrapper}
+import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, RDDStorageInfoWrapper, TaskDataWrapper}

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] techaddict commented on pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


techaddict commented on PR #39104:
URL: https://github.com/apache/spark/pull/39104#issuecomment-1358568558

   @gengliangwang addressed comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] techaddict commented on a diff in pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


techaddict commented on code in PR #39104:
URL: https://github.com/apache/spark/pull/39104#discussion_r1052723424


##
core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala:
##
@@ -21,8 +21,8 @@ import java.util.Date
 
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
-import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, TaskDataWrapper}
-import org.apache.spark.status.api.v1.{AccumulableInfo, 
ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, JobData, 
ResourceProfileInfo, RuntimeInfo}
+import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, RDDStorageInfoWrapper, TaskDataWrapper}
+import org.apache.spark.status.api.v1.{AccumulableInfo, 
ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, JobData, 
RDDDataDistribution, RDDPartitionInfo, RDDStorageInfo, ResourceProfileInfo, 
RuntimeInfo}

Review Comment:
   done



##
core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala:
##
@@ -21,8 +21,8 @@ import java.util.Date
 
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
-import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, TaskDataWrapper}
-import org.apache.spark.status.api.v1.{AccumulableInfo, 
ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, JobData, 
ResourceProfileInfo, RuntimeInfo}
+import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, RDDStorageInfoWrapper, TaskDataWrapper}

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052722369


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##
@@ -342,17 +342,14 @@ class MicroBatchExecution(
 isCurrentBatchConstructed = true
 availableOffsets = nextOffsets.toStreamProgress(sources)
 /* Initialize committed offsets to a committed batch, which at this
- * is the second latest batch id in the offset log. */
-if (latestBatchId != 0) {
-  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse 
{
-logError(s"The offset log for batch ${latestBatchId - 1} doesn't 
exist, " +
-  s"which is required to restart the query from the latest batch 
$latestBatchId " +
-  "from the offset log. Please ensure there are two subsequent 
offset logs " +
-  "available for the latest batch via manually deleting the offset 
file(s). " +
-  "Please also ensure the latest batch for commit log is equal or 
one batch " +
-  "earlier than the latest batch for offset log.")
-throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
-  }
+ * is the second latest batch id in the offset log.

Review Comment:
   How often we encounter this is not important. The fact we are removing guard 
which prevents breaking fault-tolerance semantic is important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052722369


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##
@@ -342,17 +342,14 @@ class MicroBatchExecution(
 isCurrentBatchConstructed = true
 availableOffsets = nextOffsets.toStreamProgress(sources)
 /* Initialize committed offsets to a committed batch, which at this
- * is the second latest batch id in the offset log. */
-if (latestBatchId != 0) {
-  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse 
{
-logError(s"The offset log for batch ${latestBatchId - 1} doesn't 
exist, " +
-  s"which is required to restart the query from the latest batch 
$latestBatchId " +
-  "from the offset log. Please ensure there are two subsequent 
offset logs " +
-  "available for the latest batch via manually deleting the offset 
file(s). " +
-  "Please also ensure the latest batch for commit log is equal or 
one batch " +
-  "earlier than the latest batch for offset log.")
-throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
-  }
+ * is the second latest batch id in the offset log.

Review Comment:
   How often we encounter this is not important. The fact we are removing guard 
to not break fault-tolerance semantic is important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052721653


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##
@@ -342,17 +342,14 @@ class MicroBatchExecution(
 isCurrentBatchConstructed = true
 availableOffsets = nextOffsets.toStreamProgress(sources)
 /* Initialize committed offsets to a committed batch, which at this
- * is the second latest batch id in the offset log. */
-if (latestBatchId != 0) {
-  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse 
{
-logError(s"The offset log for batch ${latestBatchId - 1} doesn't 
exist, " +
-  s"which is required to restart the query from the latest batch 
$latestBatchId " +
-  "from the offset log. Please ensure there are two subsequent 
offset logs " +
-  "available for the latest batch via manually deleting the offset 
file(s). " +
-  "Please also ensure the latest batch for commit log is equal or 
one batch " +
-  "earlier than the latest batch for offset log.")
-throw new IllegalStateException(s"batch ${latestBatchId - 1} 
doesn't exist")
-  }
+ * is the second latest batch id in the offset log.

Review Comment:
   This logic can affect the offset range of microbatch. As you've added the 
test, even without async progress tracking flag on, normal processing trigger 
can technically roll multiple microbatches back, "with composing these offsets 
into one". This breaks the assumption of exactly-once semantic, every 
microbatch should have planned its offset range before execution, and the range 
must not be changed once planned.
   
   This is why async progress tracking cannot work as it is for Delta sink and 
stateful operator. We blocked this for async progress tracking, but 
accidentally exposing this to "normal" processing trigger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39104:
URL: https://github.com/apache/spark/pull/39104#discussion_r1052719961


##
core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala:
##
@@ -21,8 +21,8 @@ import java.util.Date
 
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
-import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, TaskDataWrapper}
-import org.apache.spark.status.api.v1.{AccumulableInfo, 
ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, JobData, 
ResourceProfileInfo, RuntimeInfo}
+import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, RDDStorageInfoWrapper, TaskDataWrapper}
+import org.apache.spark.status.api.v1.{AccumulableInfo, 
ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, JobData, 
RDDDataDistribution, RDDPartitionInfo, RDDStorageInfo, ResourceProfileInfo, 
RuntimeInfo}

Review Comment:
   Let's change this to `oorg.apache.spark.status.api.v1._` to save future 
troubles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #39104: [SPARK-41425][UI] Protobuf serializer for RDDStorageInfoWrapper

2022-12-19 Thread GitBox


gengliangwang commented on code in PR #39104:
URL: https://github.com/apache/spark/pull/39104#discussion_r1052719752


##
core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala:
##
@@ -17,7 +17,7 @@
 
 package org.apache.spark.status.protobuf
 
-import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, TaskDataWrapper}
+import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, RDDStorageInfoWrapper, TaskDataWrapper}

Review Comment:
   Let's change this to `org.apache.spark.status._` to save future troubles.



##
core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala:
##
@@ -21,8 +21,8 @@ import java.util.Date
 
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
-import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, TaskDataWrapper}
-import org.apache.spark.status.api.v1.{AccumulableInfo, 
ApplicationAttemptInfo, ApplicationEnvironmentInfo, ApplicationInfo, JobData, 
ResourceProfileInfo, RuntimeInfo}
+import org.apache.spark.status.{ApplicationEnvironmentInfoWrapper, 
ApplicationInfoWrapper, JobDataWrapper, RDDStorageInfoWrapper, TaskDataWrapper}

Review Comment:
   Let's change this to `org.apache.spark.status._` to save future troubles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38517: [SPARK-39591][SS] Async Progress Tracking

2022-12-19 Thread GitBox


HeartSaVioR commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1052719778


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecution.scala:
##
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.streaming.WriteToStream
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.util.{Clock, ThreadUtils}
+
+object AsyncProgressTrackingMicroBatchExecution {
+  val ASYNC_PROGRESS_TRACKING_ENABLED = "asyncProgressTrackingEnabled"
+  val ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS =
+"asyncProgressTrackingCheckpointIntervalMs"
+
+  // for testing purposes
+  val ASYNC_PROGRESS_TRACKING_OVERRIDE_SINK_SUPPORT_CHECK =
+"_asyncProgressTrackingOverrideSinkSupportCheck"
+
+  private def getAsyncProgressTrackingCheckpointingIntervalMs(
+  extraOptions: Map[String, String]): Long = {
+extraOptions
+  .getOrElse(
+
AsyncProgressTrackingMicroBatchExecution.ASYNC_PROGRESS_TRACKING_CHECKPOINTING_INTERVAL_MS,
+"1000"
+  )
+  .toLong
+  }
+}
+
+/**
+ * Class to execute micro-batches when async progress tracking is enabled
+ */
+class AsyncProgressTrackingMicroBatchExecution(
+sparkSession: SparkSession,
+trigger: Trigger,
+triggerClock: Clock,
+extraOptions: Map[String, String],
+plan: WriteToStream)
+extends MicroBatchExecution(sparkSession, trigger, triggerClock, 
extraOptions, plan) {
+
+  protected val asyncProgressTrackingCheckpointingIntervalMs: Long
+  = AsyncProgressTrackingMicroBatchExecution
+.getAsyncProgressTrackingCheckpointingIntervalMs(extraOptions)
+
+  // Offsets that are ready to be committed by the source.
+  // This is needed so that we can call source commit in the same thread as 
micro-batch execution
+  // to be thread safe
+  private val sourceCommitQueue = new ConcurrentLinkedQueue[OffsetSeq]()
+
+  // to cache the batch id of the last batch written to storage
+  private val lastBatchPersistedToDurableStorage = new AtomicLong(-1)
+
+  override val triggerExecutor: TriggerExecutor = validateAndGetTrigger()
+
+  private var isFirstBatch: Boolean = true
+
+  // thread pool is only one thread because we want offset
+  // writes to execute in order in a serialized fashion
+  protected val asyncWritesExecutorService
+  = ThreadUtils.newDaemonSingleThreadExecutorWithRejectedExecutionHandler(
+"async-log-write",
+2, // one for offset commit and one for completion commit
+new RejectedExecutionHandler() {
+  override def rejectedExecution(r: Runnable, executor: 
ThreadPoolExecutor): Unit = {
+try {
+  if (!executor.isShutdown) {
+val start = System.currentTimeMillis()
+executor.getQueue.put(r)
+logDebug(
+  s"Async write paused execution for " +
+s"${System.currentTimeMillis() - start} due to task queue 
being full."
+)
+  }
+} catch {
+  case e: InterruptedException =>
+Thread.currentThread.interrupt()
+throw new RejectedExecutionException("Producer interrupted", e)
+  case e: Throwable =>
+logError("Encountered error in async write executor service", e)
+errorNotifier.markError(e)
+}
+  }
+})
+
+  override val offsetLog = new AsyncOffsetSeqLog(
+sparkSession,
+checkpointFile("offsets"),
+asyncWritesExecutorService,
+asyncProgressTrackingCheckpointingIntervalMs,
+clock = triggerClock
+  )
+
+  override val commitLog =
+new AsyncCommitLog(sparkSession, checkpointFile("commits"), 
asyncWritesExecutorService)
+
+  override def markMicroBatchExecutionStart(): Unit = {
+// check if pipeline is stateful
+checkNotStatefulPipeline
+  }
+
+  override def cleanUpLastExecutedMicroBatch(): Unit = {
+// this is a no op for async progress tracking since we only want to 
commit sources only
+// after the 

[GitHub] [spark] gengliangwang closed pull request #39120: [SPARK-41588][SQL] Make "rule id not found" error slightly easier to debug.

2022-12-19 Thread GitBox


gengliangwang closed pull request #39120: [SPARK-41588][SQL] Make "rule id not 
found" error slightly easier to debug.
URL: https://github.com/apache/spark/pull/39120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >