[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19916 @cloud-fan I think a lot of caller-side changes would be needed as well, since now we are not passing any variable name when we are referencing with `addReferenceMinorObj`. I am not sure that adding a new argument to `addReferenceMinorObj` only for the comment is the right way to go. Sorry for this additional comment, I just want to make sure that I am providing you all the details to choose the best option. I swear that now if you say, let's add the comment, I'll do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156077696 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { --- End diff -- Is it? Don't we also do sub-expr elimination in normal codegen? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19928: [SPARK-22267][SQL][TEST] Spark SQL incorrectly reads ORC...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19928 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156077470 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +child.flatMap {
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156077509 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +child.flatMap {
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156077428 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +child.flatMap {
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19792 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84708/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19792 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19792 **[Test build #84708 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84708/testReport)** for PR 19792 at commit [`44a1879`](https://github.com/apache/spark/commit/44a1879919a61d732eea176e26ce6a79549984a0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19916 adding a local variable to generate more readable code is better, but that needs a lot of caller-side change, which may not worth. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156074485 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,11 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val eval = doGenCode(ctx, ExprCode("", isNull, value)) + + // Records current input row and variables of this expression. + eval.inputRow = ctx.INPUT_ROW + eval.inputVars = findInputVars(ctx, eval) --- End diff -- Ok. Let me try it in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19940#discussion_r156073714 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -184,6 +190,27 @@ class CodegenContext { mutableStates += ((javaType, variableName, initCode)) } + /** + * Add a mutable state as a field to the generated class only if it does not exist yet a field + * with that name. This helps reducing the number of the generated class' fields, since the same + * variable can be reused by many functions. + * + * Internally, this method calls `addMutableState`. + * + * @param javaType Java type of the field. + * @param variableName Name of the field. + * @param initCode The statement(s) to put into the init() method to initialize this field. + */ + def addSingleMutableState( + javaType: String, + variableName: String, + initCode: String = ""): Unit = { +if (!singleMutableStates.contains(variableName)) { + addMutableState(javaType, variableName, initCode) --- End diff -- Please also check if the java type is the same. If one expression uses the same name with different type, we should alert it early. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19940 Oh, the initialization is not right away in declaration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19940 Shall we make them as `final` variables to guarantee this? I think this is an important requirement to prevent something wrong when wrongly using the shared global variables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user gberger commented on the issue: https://github.com/apache/spark/pull/19792 Good catch @HyukjinKwon! I reverted those changes and added a test to cover this regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19940 @viirya this is the requirement I followed in this change which ensures that is it safe to share the variable across all the operators, since all the access are read only and there cannot be influences. Maybe this might be relaxed in the future, but if we follow this requirement, we are sure that this is safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19940 @mgaido91 Do you mean the shared global variables are required to be only assigned once (initialization) and never changed again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19792 **[Test build #84708 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84708/testReport)** for PR 19792 at commit [`44a1879`](https://github.com/apache/spark/commit/44a1879919a61d732eea176e26ce6a79549984a0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156069510 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -291,6 +291,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S if (proxyUser != null && principal != null) { SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.") } + +val executorTimeoutThreshold = Utils.timeStringAsSeconds( + sparkProperties.getOrElse("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + sparkProperties.getOrElse("spark.executor.heartbeatInterval", "10s")) --- End diff -- Rather than repeat a default, just perform the check if both are set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156067549 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { --- End diff -- seems we only need it for sub-expr of whole stage codegen? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156066515 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { --- End diff -- why do we need to add sub-expr to parameter list? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156066138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156065731 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) --- End diff -- nit: I feel it's more readable to first handle sub-expr, then input var, finally input row. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19942: [SPARK-22754][DEPLOY] Check whether spark.executor.heart...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19942 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156064910 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
GitHub user caneGuy opened a pull request: https://github.com/apache/spark/pull/19942 [SPARK-22754][DEPLOY] Check whether spark.executor.heartbeatInterval bigger… … than spark.network.timeout or not ## What changes were proposed in this pull request? If spark.executor.heartbeatInterval bigger than spark.network.timeout,it will almost always cause exception below. `Job aborted due to stage failure: Task 4763 in stage 3.0 failed 4 times, most recent failure: Lost task 4763.3 in stage 3.0 (TID 22383, executor id: 4761, host: c3-hadoop-prc-st2966.bj): ExecutorLostFailure (executor 4761 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 154022 ms` Since many users do not get that point.He will set spark.executor.heartbeatInterval incorrectly. This patch check this case when submit applications. ## How was this patch tested? Test in cluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/caneGuy/spark zhoukang/check-heartbeat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19942 commit 891a092ac3a95f32cb2f9e1c215b1c8324c98971 Author: zhoukangDate: 2017-12-11T12:48:32Z [SPARK][DEPLOY] Check whether spark.executor.heartbeatInterval bigger than spark.network.timeout or not --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156062977 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + // Type alias for a tuple representing the data type and nullable for an expression. + type ExprProperty = (DataType, Boolean) + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, inputVars) + +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subAttrs = subExprs.map(e => (e.dataType, e.nullable)) +val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, subExprCodes) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +
[GitHub] spark pull request #19676: [SPARK-14516][FOLLOWUP] Adding ClusteringEvaluato...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19676 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19893: [SPARK-16139][TEST] Add logging functionality for leaked...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/19893 The `TestHiveContext` is shared among test suites, maybe it's not a good time to change this for now. Could we create a new class that inherit from `SparkFunSuite` that examines leaking threads, and only have test suites in core module extend from this new class? WDYT @gatorsmile @cloud-fan ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19676: [SPARK-14516][FOLLOWUP] Adding ClusteringEvaluator to ex...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19676 Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156060138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) +if containsAggregateFunctionWithWindowExpression(aggregateExprs) && + a.expressions.forall(_.resolved) => + +val windowExprAliases = new ArrayBuffer[NamedExpression]() +val newAggregateExprs = aggregateExprs.map { expr => + expr.transform { --- End diff -- The code below assumes that there are no window aggregates on top of a regular aggregate, and it will push the regular aggregate into the underlying window. An example of this: `df.groupBy(a).agg(max(rank().over(window1)), sum(sum(c)).over(window2))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19919: [SPARK-22727] spark.executor.instances's default value s...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19919 If the default is correctly described as 2, then I think there is nothing to do here and this should be closed. This change will cause other problems, at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156059425 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) --- End diff -- Can you add a test case for this scenario? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156059043 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) --- End diff -- To be sure: What happens if there is a window function on-top of the aggregate function? This gets resolved in two passes right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19941 **[Test build #84707 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84707/testReport)** for PR 19941 at commit [`1481163`](https://github.com/apache/spark/commit/14811636692810809033bc7caf03fcecb6939aa3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156058341 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) +if containsAggregateFunctionWithWindowExpression(aggregateExprs) && + a.expressions.forall(_.resolved) => + +val windowExprAliases = new ArrayBuffer[NamedExpression]() +val newAggregateExprs = aggregateExprs.map { expr => + expr.transform { +case aggExpr @ AggregateExpression(func, _, _, _) if hasWindowFunction(func.children) => + val newFuncChildren = func.children.map { funcExpr => +funcExpr.transform { + case we: WindowExpression => +// Replace window expressions with aliases to them +val windowExprAlias = Alias(we, s"_we${windowExprAliases.length}")() +windowExprAliases += windowExprAlias +windowExprAlias.toAttribute +} + } + val newFunc = func.withNewChildren(newFuncChildren).asInstanceOf[AggregateFunction] + aggExpr.copy(aggregateFunction = newFunc) + }.asInstanceOf[NamedExpression] +} +val window = addWindow(windowExprAliases, child) +// TODO do we also need a projection here? +Aggregate(groupingExprs, newAggregateExprs, window) --- End diff -- No you don't need a Project. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156057964 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,11 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val eval = doGenCode(ctx, ExprCode("", isNull, value)) + + // Records current input row and variables of this expression. + eval.inputRow = ctx.INPUT_ROW + eval.inputVars = findInputVars(ctx, eval) --- End diff -- actually we can be more aggresive and do `val isNull = if (this.nullable) eval.isNull else "false"`, and see if there is any compile errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19940#discussion_r156057818 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -184,6 +190,27 @@ class CodegenContext { mutableStates += ((javaType, variableName, initCode)) } + /** + * Add a mutable state as a field to the generated class only if it does not exist yet a field + * with that name. This helps reducing the number of the generated class' fields, since the same + * variable can be reused by many functions. + * + * Internally, this method calls `addMutableState`. + * + * @param javaType Java type of the field. + * @param variableName Name of the field. + * @param initCode The statement(s) to put into the init() method to initialize this field. + */ + def addSingleMutableState( + javaType: String, + variableName: String, + initCode: String = ""): Unit = { +if (!singleMutableStates.contains(variableName)) { + addMutableState(javaType, variableName, initCode) --- End diff -- if you want, I can add it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAnd...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/19941 [SPARK-22753][SQL] Get rid of dataSource.writeAndRead ## What changes were proposed in this pull request? As the discussion in https://github.com/apache/spark/pull/16481 and https://github.com/apache/spark/pull/18975#discussion_r155454606 Currently the BaseRelation returned by `dataSource.writeAndRead` only used in `CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some common code paths. In this patch I removed the writeAndRead function and added the getRelation function which only use in `CreateDataSourceTableAsSelectCommand` while saving data to non-existing table. ## How was this patch tested? Existing UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-22753 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19941.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19941 commit 14811636692810809033bc7caf03fcecb6939aa3 Author: Yuanjian LiDate: 2017-12-11T12:12:45Z Get rid of dataSource.writeAndRead --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19805: [SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19805 **[Test build #84706 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84706/testReport)** for PR 19805 at commit [`9beb375`](https://github.com/apache/spark/commit/9beb375d15d162519856d32d6bd12e3cf1860d68). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156057427 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1001,16 +1039,25 @@ class CodegenContext { commonExprs.foreach { e => val expr = e.head val fnName = freshName("evalExpr") - val isNull = s"${fnName}IsNull" + val isNull = if (expr.nullable) { +s"${fnName}IsNull" + } else { +"" + } val value = s"${fnName}Value" // Generate the code for this expression tree and wrap it in a function. val eval = expr.genCode(this) + val nullValue = if (expr.nullable) { --- End diff -- nit: `assignIsNull` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19916 @cloud-fan thanks for your answer. I don't think that something like: ``` ((MyClass) reference[3] /* variableName */).doSomething(); ``` would be readable. But it is just my opinion. Do you want me to add a local variable to generate more readable code? Or if you think that adding the comment is the right thing to do, I can do that. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156056362 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,11 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val eval = doGenCode(ctx, ExprCode("", isNull, value)) + + // Records current input row and variables of this expression. + eval.inputRow = ctx.INPUT_ROW + eval.inputVars = findInputVars(ctx, eval) --- End diff -- shall we do one more thing here? ``` eval.isNull = if (this.nullable) eval.isNull else "false" ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156055243 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,42 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { --- End diff -- do we have a requirement that only one of them can be not null/Nil? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19940#discussion_r156054632 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -184,6 +190,27 @@ class CodegenContext { mutableStates += ((javaType, variableName, initCode)) } + /** + * Add a mutable state as a field to the generated class only if it does not exist yet a field + * with that name. This helps reducing the number of the generated class' fields, since the same + * variable can be reused by many functions. + * + * Internally, this method calls `addMutableState`. + * + * @param javaType Java type of the field. + * @param variableName Name of the field. + * @param initCode The statement(s) to put into the init() method to initialize this field. + */ + def addSingleMutableState( + javaType: String, + variableName: String, + initCode: String = ""): Unit = { +if (!singleMutableStates.contains(variableName)) { + addMutableState(javaType, variableName, initCode) --- End diff -- shall we add an assert here to make sure `initCode` is same with the previous one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19916 In a real application I think the difference is much smaller than 2.4%. So I think it's ok to remove `addReferenceObj`. One problem is readability of the generated code, may be we can generate `(MyClass) reference[3] /* variableNane */` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19937 **[Test build #84705 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84705/testReport)** for PR 19937 at commit [`9faf0a2`](https://github.com/apache/spark/commit/9faf0a2644739e9e19968c5077d6b14011aab9dd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14151 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84704/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/14151 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14151 **[Test build #84704 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84704/testReport)** for PR 14151 at commit [`66d5b45`](https://github.com/apache/spark/commit/66d5b453cd2aaaea08a3843f4966fc9036451b6c). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class WholeTextFileSuite extends QueryTest with SharedSQLContext ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19193 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19940#discussion_r156052035 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -184,6 +190,27 @@ class CodegenContext { mutableStates += ((javaType, variableName, initCode)) } + /** + * Add a mutable state as a field to the generated class only if it does not exist yet a field + * with that name. This helps reducing the number of the generated class' fields, since the same + * variable can be reused by many functions. + * + * Internally, this method calls `addMutableState`. + * + * @param javaType Java type of the field. + * @param variableName Name of the field. + * @param initCode The statement(s) to put into the init() method to initialize this field. + */ + def addSingleMutableState( + javaType: String, + variableName: String, + initCode: String = ""): Unit = { --- End diff -- It is not supported. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19940#discussion_r156051417 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -184,6 +190,27 @@ class CodegenContext { mutableStates += ((javaType, variableName, initCode)) } + /** + * Add a mutable state as a field to the generated class only if it does not exist yet a field + * with that name. This helps reducing the number of the generated class' fields, since the same + * variable can be reused by many functions. + * + * Internally, this method calls `addMutableState`. + * + * @param javaType Java type of the field. + * @param variableName Name of the field. + * @param initCode The statement(s) to put into the init() method to initialize this field. + */ + def addSingleMutableState( + javaType: String, + variableName: String, + initCode: String = ""): Unit = { --- End diff -- How can we support different initCode for the same name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19937 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19937 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84703/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19937 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19937 **[Test build #84703 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84703/testReport)** for PR 19937 at commit [`9faf0a2`](https://github.com/apache/spark/commit/9faf0a2644739e9e19968c5077d6b14011aab9dd). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19940 @viirya we have seen that using arrays affects performance. Thus if we can reduce their usage it is better. I don't think that debugging is harder. These variables I made shared are never assigned, but in the initialization. Do you have an other opinion? Or are you thinking for something specific? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r156037616 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -48,9 +48,26 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { }.isDefined } + private def isPandasGroupAggUdf(expr: Expression): Boolean = expr match { + case _ @ PythonUDF(_, _, _, _, PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF ) => true + case Alias(expr, _) => isPandasGroupAggUdf(expr) + case _ => false + } + + private def hasPandasGroupAggUdf(agg: Aggregate): Boolean = { +val actualAggExpr = agg.aggregateExpressions.drop(agg.groupingExpressions.length) +actualAggExpr.exists(isPandasGroupAggUdf) + } + + private def extract(agg: Aggregate): LogicalPlan = { val projList = new ArrayBuffer[NamedExpression]() val aggExpr = new ArrayBuffer[NamedExpression]() + +if (hasPandasGroupAggUdf(agg)) { + Aggregate(agg.groupingExpressions, agg.aggregateExpressions, agg.child) +} else { + --- End diff -- nit: style, we need indent for this block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r156028776 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -32,7 +31,5 @@ case class PythonUDF( evalType: Int) extends Expression with Unevaluable with NonSQLExpression with UserDefinedExpression { - override def toString: String = s"$name(${children.mkString(", ")})" --- End diff -- Why was this removed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r156034167 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, JoinedRow, NamedExpression, PythonUDF, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.Utils + +case class AggregateInPandasExec( +groupingExpressions: Seq[Expression], +udfExpressions: Seq[PythonUDF], +resultExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryExecNode { + + override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = { +if (groupingExpressions.isEmpty) { + AllTuples :: Nil +} else { + ClusteredDistribution(groupingExpressions) :: Nil +} + } + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { +udf.children match { + case Seq(u: PythonUDF) => +val (chained, children) = collectFunctions(u) +(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => +// There should not be any other UDFs, or the children can't be evaluated directly. +assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) +(ChainedPythonFunctions(Seq(udf.func)), udf.children) +} + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingExpressions.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val sessionLocalTimeZone = conf.sessionLocalTimeZone +val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone + +val (pyFuncs, inputs) = udfExpressions.map(collectFunctions).unzip + +val allInputs = new ArrayBuffer[Expression] +val dataTypes = new ArrayBuffer[DataType] + +allInputs.appendAll(groupingExpressions) + +val argOffsets = inputs.map { input => + input.map { e => + allInputs += e + dataTypes += e.dataType + allInputs.length - 1 - groupingExpressions.length + }.toArray +}.toArray + +val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) => + StructField(s"_$i", dt) +}) + +inputRDD.mapPartitionsInternal { iter => + val grouped = if (groupingExpressions.isEmpty) { +Iterator((null, iter)) + } else { +val groupedIter = GroupedIterator(iter, groupingExpressions, child.output) + +val dropGrouping = + UnsafeProjection.create(allInputs.drop(groupingExpressions.length), child.output) + +
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r156037157 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -48,9 +48,26 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { }.isDefined } + private def isPandasGroupAggUdf(expr: Expression): Boolean = expr match { + case _ @ PythonUDF(_, _, _, _, PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF ) => true --- End diff -- We don't need `_ @` here. nit: remove extra space after `SQL_PANDAS_GROUP_AGG_UDF`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r156031375 --- Diff: python/pyspark/sql/tests.py --- @@ -4016,6 +4016,124 @@ def test_unsupported_types(self): with self.assertRaisesRegexp(Exception, 'Unsupported data type'): df.groupby('id').apply(f).collect() +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyAggTests(ReusedSQLTestCase): +def assertFramesEqual(self, expected, result): --- End diff -- nit: how about making this the common method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19872#discussion_r156038036 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala --- @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.python +package org.apache.spark.sql.catalyst.expressions --- End diff -- Do we need to move package to catalyst? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19813 ping @cloud-fan Previous comments are all addressed. Please review this again. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19940 A high level question is, do we need to share mutable status, if we can compact global variables into array later? Will sharing mutable status increase the difficulty of debugging codegen in the future? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19929: [SPARK-22629][PYTHON] Add deterministic flag to pyspark ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19929 @gatorsmile sorry, I saw that you did the path for scala UDF. Might you help reviewing this please? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19940 @cloud-fan @kiszk @viirya might you please help reviewing this? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19811 @kiszk I see, thanks. Wouldn't be better anyway to store all the initialization in the same array so that we ensure that the ordering is the same as before this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19938: [SPARK-22747][SQL] Localize lifetime of mutable states i...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19938 Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19893: [SPARK-16139][TEST] Add logging functionality for leaked...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/19893 I've analysed the hive related test flow and found SparkSession and SQLContext sharing between suites as you mentioned. Here is the execution flow: 1. The first hive test suite instantiates TestHive which creates SparkSession and SQLContext 2. SparkFunSuite.beforeAll creates a thread snapshot 3. Test code runs 4. TestHiveSingleton.afterAll resets SparkSession 5. SparkFunSuite.afterAll prints out the possible leaks Step one executed only by the first hive suite and never again. Here I do not see false positives in big scale. The only possible false positive threads what I foresee could come from lazy initialisation within SparkSession or SQLContext. On the leftover side we're not tracking SparkSession and SQLContext threads but because of the singleton nature my suggestion is to leave it like that. In this case you mentioned ``` $ grep 'POSSIBLE THREAD LEAK' unit-tests.log | wc -l 158 ``` I can imagine the following situations: 1. Test doesn't call hiveContext.reset() 2. Test creates thread but not frees up 3. Production code issue 4. ... Of course there could be other issues which I've not considered, please share your ideas. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19811 Good question, got it. There are some cases that we have to ensure order of declarations. The current code asks developers to ensure it by using `inline = true`. We can see [an example](https://github.com/apache/spark/pull/19811/files#diff-90b107e5c61791e17d5b4b25021b89fdR323) . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19811 @kiszk yes, I know, but I meant a different thing. I will try to explain with an example, let me know if I am not clear enough. Since we are not defining everything as an array and we are initializing everything which doesn't go in an array before everything which goes into an array, currently, the following situation can happen: - we define a variable (`v1`) which goes into an array; - then we define a variable (`v2DependsOnV1`) which doesn't go into any array, but which relies on the previous variable; Before this PR, we were ensuring that the order of the initialization was the same of the declarations, then it would have been: ``` // here we init v1 // here we init v2DependsOnV1 ``` After the PR, in the above condition, we don't maintain the same order and we would have: ``` // here we init v2DependsOnV1 // here we init v1 ``` which is a wrong situation. Maybe this is a condition which never happens, ie. we don't have variables which depends on others during the initialization, thus maybe I am worried for a situation which never happens, but this is my question: are we sure this is not a problem? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/14151 **[Test build #84704 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84704/testReport)** for PR 14151 at commit [`66d5b45`](https://github.com/apache/spark/commit/66d5b453cd2aaaea08a3843f4966fc9036451b6c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19811 @mgaido91 The latest code does not generate a loop for initializations. This is an optimization to reduce the JVM bytecode size. As @cloud-fan pointed out [here](https://github.com/apache/spark/pull/19811#issuecomment-348144586), it should be addressed by another PR since it is another story. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19911: [SPARK-22729][SQL] Add getTruncateQuery to JdbcDialect
Github user danielvdende commented on the issue: https://github.com/apache/spark/pull/19911 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19715#discussion_r156010806 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala --- @@ -129,34 +156,102 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui @Since("2.1.0") def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + /** @group setParam */ + @Since("2.3.0") + def setNumBucketsArray(value: Array[Int]): this.type = set(numBucketsArray, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCols(value: Array[String]): this.type = set(inputCols, value) + + /** @group setParam */ + @Since("2.3.0") + def setOutputCols(value: Array[String]): this.type = set(outputCols, value) + + private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = { +if (isSet(inputCols) && isSet(inputCol)) { + logWarning("Both `inputCol` and `inputCols` are set, we ignore `inputCols` and this " + --- End diff -- According to the discussion result at JIRA SPARK-8418, we should throw exception when both inputCol and inputCols are specified ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19937 LGTM too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19811 hi @kiszk, have you already checked why there are python failures? I wanted to share a concern I have with the current implementation. Maybe, it is not a problem, but I think that currently we are changing the order in which we are initializing variables, since we are initializing the arrays after the simple fields and this might not be the original order. Do you think this might be a problem? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19937 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19937#discussion_r156010028 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -513,26 +513,28 @@ case class SortMergeJoinExec( * the variables should be declared separately from accessing the columns, we can't use the * codegen of BoundReference here. */ - private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = { + private def createLeftVars(ctx: CodegenContext, leftRow: String): (Seq[ExprCode], Seq[String]) = { ctx.INPUT_ROW = leftRow left.output.zipWithIndex.map { case (a, i) => val value = ctx.freshName("value") val valueCode = ctx.getValue(leftRow, a.dataType, i.toString) - // declare it as class member, so we can access the column before or in the loop. - ctx.addMutableState(ctx.javaType(a.dataType), value) if (a.nullable) { val isNull = ctx.freshName("isNull") -ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) val code = s""" |$isNull = $leftRow.isNullAt($i); |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : ($valueCode); """.stripMargin -ExprCode(code, isNull, value) +(ExprCode(code, isNull, value), --- End diff -- Yea, I see, I just ran the test and found it too. Looks like puts the declaration at top is simplest. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19937 **[Test build #84703 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84703/testReport)** for PR 19937 at commit [`9faf0a2`](https://github.com/apache/spark/commit/9faf0a2644739e9e19968c5077d6b14011aab9dd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19937#discussion_r156007853 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -513,26 +513,28 @@ case class SortMergeJoinExec( * the variables should be declared separately from accessing the columns, we can't use the * codegen of BoundReference here. */ - private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = { + private def createLeftVars(ctx: CodegenContext, leftRow: String): (Seq[ExprCode], Seq[String]) = { ctx.INPUT_ROW = leftRow left.output.zipWithIndex.map { case (a, i) => val value = ctx.freshName("value") val valueCode = ctx.getValue(leftRow, a.dataType, i.toString) - // declare it as class member, so we can access the column before or in the loop. - ctx.addMutableState(ctx.javaType(a.dataType), value) if (a.nullable) { val isNull = ctx.freshName("isNull") -ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) val code = s""" |$isNull = $leftRow.isNullAt($i); |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : ($valueCode); """.stripMargin -ExprCode(code, isNull, value) +(ExprCode(code, isNull, value), --- End diff -- Good question. If we declare variable types only at `code`, it may lead to compilation error in`janino`. The `code` that you pointed out may go to `$leftAfter` in `$condCheck` that exists in a `if-then` block at the inner most of the loop in the generated code, by using `leftVar`. The variable in `leftVars` is also referred to at `${consume(ctx, leftVars ++ rightVars)}`. In the following example, If we declare variable types only at `code`, we will drop lines 145 and will declare `int` for `smj_value8` at lines 162. Since `smj_value8` is refered at line 169, the generated code would cause compilation error. WDYT? ``` /* 143 */ protected void processNext() throws java.io.IOException { /* 144 */ while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) { /* 145 */ int smj_value8 = -1; /* 146 */ boolean smj_isNull6 = false; /* 147 */ int smj_value9 = -1; /* 148 */ boolean smj_loaded = false; /* 149 */ smj_isNull6 = smj_leftRow.isNullAt(1); /* 150 */ smj_value9 = smj_isNull6 ? -1 : (smj_leftRow.getInt(1)); /* 151 */ scala.collection.Iterator smj_iterator = smj_matches.generateIterator(); /* 152 */ while (smj_iterator.hasNext()) { ... /* 160 */ if (!smj_loaded) { /* 161 */ smj_loaded = true; /* 162 */ smj_value8 = smj_leftRow.getInt(0); /* 163 */ } /* 164 */ int smj_value10 = smj_rightRow1.getInt(0); /* 165 */ smj_numOutputRows.add(1); /* 166 */ /* 167 */ smj_rowWriter.zeroOutNullBytes(); /* 168 */ /* 169 */ smj_rowWriter.write(0, smj_value8); ... /* 185 */ /* 186 */ } /* 187 */ if (shouldStop()) return; /* 188 */ } /* 189 */ } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19937#discussion_r156005276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -513,26 +513,28 @@ case class SortMergeJoinExec( * the variables should be declared separately from accessing the columns, we can't use the * codegen of BoundReference here. */ - private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = { + private def createLeftVars(ctx: CodegenContext, leftRow: String): (Seq[ExprCode], Seq[String]) = { ctx.INPUT_ROW = leftRow left.output.zipWithIndex.map { case (a, i) => val value = ctx.freshName("value") val valueCode = ctx.getValue(leftRow, a.dataType, i.toString) - // declare it as class member, so we can access the column before or in the loop. - ctx.addMutableState(ctx.javaType(a.dataType), value) if (a.nullable) { val isNull = ctx.freshName("isNull") -ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) val code = s""" |$isNull = $leftRow.isNullAt($i); |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : ($valueCode); """.stripMargin -ExprCode(code, isNull, value) +(ExprCode(code, isNull, value), --- End diff -- good catch! Makes sense to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/19937#discussion_r156005086 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -617,6 +619,7 @@ case class SortMergeJoinExec( s""" |while (findNextInnerJoinRows($leftInput, $rightInput)) { + | ${leftVarDecl.mkString("\n")} --- End diff -- @kiszk thanks for the test. Then I agree this is the best option, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19937#discussion_r156004320 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -513,26 +513,28 @@ case class SortMergeJoinExec( * the variables should be declared separately from accessing the columns, we can't use the * codegen of BoundReference here. */ - private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = { + private def createLeftVars(ctx: CodegenContext, leftRow: String): (Seq[ExprCode], Seq[String]) = { ctx.INPUT_ROW = leftRow left.output.zipWithIndex.map { case (a, i) => val value = ctx.freshName("value") val valueCode = ctx.getValue(leftRow, a.dataType, i.toString) - // declare it as class member, so we can access the column before or in the loop. - ctx.addMutableState(ctx.javaType(a.dataType), value) if (a.nullable) { val isNull = ctx.freshName("isNull") -ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull) val code = s""" |$isNull = $leftRow.isNullAt($i); |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : ($valueCode); """.stripMargin -ExprCode(code, isNull, value) +(ExprCode(code, isNull, value), --- End diff -- Why separate variable declaration and code to two parts? Previously it is because it uses global variables. Now since we use local variables, I think we can simply do: ```scala val code = s""" |boolean $isNull = $leftRow.isNullAt($i); |${ctx.javaType(a.dataType)} $value = $isNull ? ${ctx.defaultValue(a.dataType)} : ($valueCode); """.stripMargin ``` Don't we? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19811: [SPARK-18016][SQL] Code Generation: Constant Pool...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19811#discussion_r156003506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -177,11 +189,64 @@ class CodegenContext { * the list of default imports available. * Also, generic type arguments are accepted but ignored. * @param variableName Name of the field. - * @param initCode The statement(s) to put into the init() method to initialize this field. + * @param codeFunctions Function includes statement(s) to put into the init() method to + * initialize this field. An argument is the name of the mutable state variable * If left blank, the field will be default-initialized. + * @param inline whether the declaration and initialization code may be inlined rather than + * compacted. + * @param useFreshName If false and inline is true, the name is not changed + * @return the name of the mutable state variable, which is either the original name if the + * variable is inlined to the outer class, or an array access if the variable is to be + * stored in an array of variables of the same type and initialization. + * There are two use cases. One is to use the original name for global variable instead + * of fresh name. Second is to use the original initialization statement since it is + * complex (e.g. allocate multi-dimensional array or object constructor has varibles). + * Primitive type variables will be inlined into outer class when the total number of + * mutable variables is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD` + * the max size of an array for compaction is given by + * `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. */ - def addMutableState(javaType: String, variableName: String, initCode: String = ""): Unit = { -mutableStates += ((javaType, variableName, initCode)) + def addMutableState( + javaType: String, + variableName: String, + codeFunctions: String => String = _ => "", --- End diff -- `initFunc`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r156002166 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -805,26 +908,44 @@ case class HashAggregateExec( def updateRowInFastHashMap(isVectorized: Boolean): Option[String] = { - ctx.INPUT_ROW = fastRowBuffer + // We need to copy the aggregation row buffer to a local row first because each aggregate + // function directly updates the buffer when it finishes. + val localRowBuffer = ctx.freshName("localFastRowBuffer") + val initLocalRowBuffer = s"InternalRow $localRowBuffer = $fastRowBuffer.copy();" + + ctx.INPUT_ROW = localRowBuffer val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) val effectiveCodes = subExprs.codes.mkString("\n") val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { boundUpdateExpr.map(_.genCode(ctx)) } - val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => + + val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType -ctx.updateColumn(fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized) +val updateColumnCode = ctx.updateColumn( + fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized) +s""" + | // evaluate aggregate function + | ${ev.code} + | // update fast row + | $updateColumnCode + """.stripMargin } + + val updateAggValCode = splitAggregateExpressions( +ctx, boundUpdateExpr, evalAndUpdateCodes, subExprs.states, +Seq(("InternalRow", fastRowBuffer))) --- End diff -- indents --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r15593 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1070,6 +1071,24 @@ class CodegenContext { } } +object CodegenContext { + + private val javaKeywords = Set( +"abstract", "assert", "boolean", "break", "byte", "case", "catch", "char", "class", "const", +"continue", "default", "do", "double", "else", "extends", "false", "final", "finally", "float", +"for", "goto", "if", "implements", "import", "instanceof", "int", "interface", "long", "native", +"new", "null", "package", "private", "protected", "public", "return", "short", "static", +"strictfp", "super", "switch", "synchronized", "this", "throw", "throws", "transient", "true", +"try", "void", "volatile", "while" + ) + + def isJavaIdentifier(str: String): Boolean = str match { +case null | "" => false +case _ => !javaKeywords.contains(str) && isJavaIdentifierStart(str.charAt(0)) && + (1 until str.length).forall(i => isJavaIdentifierPart(str.charAt(i))) + } --- End diff -- ```Scala /** * Returns true if the given `str` is a valid java identifier. */ def isJavaIdentifier(str: String): Boolean = str match { case null | "" => false case _ => !javaKeywords.contains(str) && isJavaIdentifierStart(str.charAt(0)) && (1 until str.length).forall(i => isJavaIdentifierPart(str.charAt(i))) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r156003342 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -256,6 +258,85 @@ case class HashAggregateExec( """.stripMargin } + // Extracts all the input variable references for a given `aggExpr`. This result will be used + // to split aggregation into small functions. + private def getInputVariableReferences( + ctx: CodegenContext, + aggExpr: Expression, + subExprs: Map[Expression, SubExprEliminationState]): Set[(String, String)] = { +// `argSet` collects all the pairs of variable names and their types, the first in the pair is +// a type name and the second is a variable name. +val argSet = mutable.Set[(String, String)]() +val stack = mutable.Stack[Expression](aggExpr) +while (stack.nonEmpty) { + stack.pop() match { +case e if subExprs.contains(e) => + val exprCode = subExprs(e) + if (CodegenContext.isJavaIdentifier(exprCode.value)) { +argSet += ((ctx.javaType(e.dataType), exprCode.value)) + } + if (CodegenContext.isJavaIdentifier(exprCode.isNull)) { +argSet += (("boolean", exprCode.isNull)) + } + // Since the children possibly has common expressions, we push them here + stack.pushAll(e.children) +case ref: BoundReference +if ctx.currentVars != null && ctx.currentVars(ref.ordinal) != null => + val value = ctx.currentVars(ref.ordinal).value + val isNull = ctx.currentVars(ref.ordinal).isNull + if (CodegenContext.isJavaIdentifier(value)) { +argSet += ((ctx.javaType(ref.dataType), value)) + } + if (CodegenContext.isJavaIdentifier(isNull)) { +argSet += (("boolean", isNull)) + } +case _: BoundReference => + argSet += (("InternalRow", ctx.INPUT_ROW)) +case e => + stack.pushAll(e.children) + } +} + +argSet.toSet + } + + // Splits aggregate code into small functions because JVMs does not compile too long functions + private def splitAggregateExpressions( + ctx: CodegenContext, + aggExprs: Seq[Expression], + evalAndUpdateCodes: Seq[String], + subExprs: Map[Expression, SubExprEliminationState], + otherArgs: Seq[(String, String)] = Seq.empty): Seq[String] = { +aggExprs.zipWithIndex.map { case (aggExpr, i) => + // The maximum length of parameters in non-static Java methods is 254, but a parameter of + // type long or double contributes two units to the length. So, this method gives up + // splitting the code if the parameter length goes over 127. + val args = (getInputVariableReferences(ctx, aggExpr, subExprs) ++ otherArgs).toSeq + + // This is for testing/benchmarking only + val maxParamNumInJavaMethod = + sqlContext.getConf("spark.sql.codegen.aggregate.maxParamNumInJavaMethod", null) match { --- End diff -- Let us introduce an internal SQLConf. If the number is high enough, we can disable this feature. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r156000426 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala --- @@ -380,4 +380,19 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { s"Incorrect Evaluation: expressions: $exprAnd, actual: $actualAnd, expected: $expectedAnd") } } + + test("SPARK-21870 check if CodegenContext.isJavaIdentifier works correctly") { +assert(CodegenContext.isJavaIdentifier("agg_value") === true) +assert(CodegenContext.isJavaIdentifier("agg_value1") === true) +assert(CodegenContext.isJavaIdentifier("bhj_value4") === true) +assert(CodegenContext.isJavaIdentifier("smj_value6") === true) +assert(CodegenContext.isJavaIdentifier("rdd_value7") === true) +assert(CodegenContext.isJavaIdentifier("scan_isNull") === true) +assert(CodegenContext.isJavaIdentifier("test") === true) +assert(CodegenContext.isJavaIdentifier("true") === false) +assert(CodegenContext.isJavaIdentifier("false") === false) +assert(CodegenContext.isJavaIdentifier("390239") === false) +assert(CodegenContext.isJavaIdentifier(literal) === false) +assert(CodegenContext.isJavaIdentifier(double) === false) --- End diff -- ```Scala import CodegenContext.isJavaIdentifier // positive cases assert(isJavaIdentifier("agg_value")) assert(isJavaIdentifier("agg_value1")) assert(isJavaIdentifier("bhj_value4")) assert(isJavaIdentifier("smj_value6")) assert(isJavaIdentifier("rdd_value7")) assert(isJavaIdentifier("scan_isNull")) assert(isJavaIdentifier("test")) // negative cases assert(!isJavaIdentifier("true")) assert(!isJavaIdentifier("false")) assert(!isJavaIdentifier("390239")) assert(!isJavaIdentifier(literal)) assert(!isJavaIdentifier(double)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19082#discussion_r156002134 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -863,25 +984,43 @@ case class HashAggregateExec( } val updateRowInUnsafeRowMap: String = { - ctx.INPUT_ROW = unsafeRowBuffer + // We need to copy the aggregation row buffer to a local row first because each aggregate + // function directly updates the buffer when it finishes. + val localRowBuffer = ctx.freshName("localUnsafeRowBuffer") + val initLocalRowBuffer = s"InternalRow $localRowBuffer = $unsafeRowBuffer.copy();" + + ctx.INPUT_ROW = localRowBuffer val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) val effectiveCodes = subExprs.codes.mkString("\n") val unsafeRowBufferEvals = ctx.withSubExprEliminationExprs(subExprs.states) { boundUpdateExpr.map(_.genCode(ctx)) } - val updateUnsafeRowBuffer = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => + + val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) => val dt = updateExpr(i).dataType -ctx.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) +val updateColumnCode = ctx.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable) +s""" + | // evaluate aggregate function + | ${ev.code} + | // update unsafe row buffer + | $updateColumnCode + """.stripMargin } + + val updateAggValCode = splitAggregateExpressions( +ctx, boundUpdateExpr, evalAndUpdateCodes, subExprs.states, +Seq(("InternalRow", unsafeRowBuffer))) --- End diff -- indents. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19938: [SPARK-22747][SQL] Localize lifetime of mutable states i...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19938 actually there is one real problem: after we fold many global variables into an array, the variable name may become something like `arr[1]`, which can't be used as the parameter name. Localize the global variables in current expression/operator is one solution, another one is generating parameter names instead of reusing the input variable name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19939: [SPARK-20557] [SQL] Only support TIMESTAMP WITH TIME ZON...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19939 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19939: [SPARK-20557] [SQL] Only support TIMESTAMP WITH T...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19939#discussion_r156002148 --- Diff: external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala --- @@ -235,6 +239,61 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(types(1).equals("class java.sql.Timestamp")) } + test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different from default") { +val defaultJVMTimeZone = TimeZone.getDefault +// Pick the timezone different from the current default time zone of JVM +val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia") +val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai") +val localSessionTimeZone = + if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else shanghaiTimeZone + +withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> localSessionTimeZone.getID) { + val e = intercept[java.sql.SQLException] { +val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) +dfRead.collect() + }.getMessage + assert(e.contains("Unrecognized SQL type -101")) +} + } + + /** + * Change the Time Zone `timeZoneId` of JVM before executing `f`, then switches back to the + * original after `f` returns. + * @param timeZoneId the ID for a TimeZone, either an abbreviation such as "PST", a full name such + * as "America/Los_Angeles", or a custom ID such as "GMT-8:00". + */ + private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = { +val originalLocale = TimeZone.getDefault +try { + // Add Locale setting + TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId)) + f +} finally { + TimeZone.setDefault(originalLocale) +} + } + + test("Column TIMESTAMP with TIME ZONE(JVM timezone)") { +def checkRow(row: Row, ts: String): Unit = { + assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts))) +} + +val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties) +withTimeZone("PST") { --- End diff -- I feel it's safer if we also set session local time zone in `withTimeZone` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org