[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19916
  
@cloud-fan I think a lot of caller-side changes would be needed as well, 
since now we are not passing any variable name when we are referencing with 
`addReferenceMinorObj`. I am not sure that adding a new argument to 
`addReferenceMinorObj` only for the comment is the right way to go.

Sorry for this additional comment, I just want to make sure that I am 
providing you all the details to choose the best option. I swear that now if 
you say, let's add the comment, I'll do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156077696
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
--- End diff --

Is it? Don't we also do sub-expr elimination in normal codegen?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19928: [SPARK-22267][SQL][TEST] Spark SQL incorrectly reads ORC...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19928
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156077470
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val state = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = state.value, isNull = state.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+child.flatMap {

[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156077509
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val state = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = state.value, isNull = state.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+child.flatMap {

[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156077428
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val state = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = state.value, isNull = state.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+child.flatMap {

[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19792
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84708/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19792
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19792
  
**[Test build #84708 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84708/testReport)**
 for PR 19792 at commit 
[`44a1879`](https://github.com/apache/spark/commit/44a1879919a61d732eea176e26ce6a79549984a0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19916
  
adding a local variable to generate more readable code is better, but that 
needs a lot of caller-side change, which may not worth.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156074485
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -105,6 +105,11 @@ abstract class Expression extends TreeNode[Expression] 
{
   val isNull = ctx.freshName("isNull")
   val value = ctx.freshName("value")
   val eval = doGenCode(ctx, ExprCode("", isNull, value))
+
+  // Records current input row and variables of this expression.
+  eval.inputRow = ctx.INPUT_ROW
+  eval.inputVars = findInputVars(ctx, eval)
--- End diff --

Ok. Let me try it in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19940#discussion_r156073714
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -184,6 +190,27 @@ class CodegenContext {
 mutableStates += ((javaType, variableName, initCode))
   }
 
+  /**
+   * Add a mutable state as a field to the generated class only if it does 
not exist yet a field
+   * with that name. This helps reducing the number of the generated 
class' fields, since the same
+   * variable can be reused by many functions.
+   *
+   * Internally, this method calls `addMutableState`.
+   *
+   * @param javaType Java type of the field.
+   * @param variableName Name of the field.
+   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   */
+  def addSingleMutableState(
+  javaType: String,
+  variableName: String,
+  initCode: String = ""): Unit = {
+if (!singleMutableStates.contains(variableName)) {
+  addMutableState(javaType, variableName, initCode)
--- End diff --

Please also check if the java type is the same. If one expression uses the 
same name with different type, we should alert it early.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19940
  
Oh, the initialization is not right away in declaration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19940
  
Shall we make them as `final` variables to guarantee this? I think this is 
an important requirement to prevent something wrong when wrongly using the 
shared global variables.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...

2017-12-11 Thread gberger
Github user gberger commented on the issue:

https://github.com/apache/spark/pull/19792
  
Good catch @HyukjinKwon! I reverted those changes and added a test to cover 
this regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19940
  
@viirya this is the requirement I followed in this change which ensures 
that is it safe to share the variable across all the operators, since all the 
access are read only and there cannot be influences. Maybe this might be 
relaxed in the future, but if we follow this requirement, we are sure that this 
is safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19940
  
@mgaido91  Do you mean the shared global variables are required to be only 
assigned once (initialization) and never changed again?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19792
  
**[Test build #84708 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84708/testReport)**
 for PR 19792 at commit 
[`44a1879`](https://github.com/apache/spark/commit/44a1879919a61d732eea176e26ce6a79549984a0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...

2017-12-11 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19942#discussion_r156069510
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -291,6 +291,18 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 if (proxyUser != null && principal != null) {
   SparkSubmit.printErrorAndExit("Only one of --proxy-user or 
--principal can be provided.")
 }
+
+val executorTimeoutThreshold = Utils.timeStringAsSeconds(
+  sparkProperties.getOrElse("spark.network.timeout", "120s"))
+val executorHeartbeatInterval = Utils.timeStringAsSeconds(
+  sparkProperties.getOrElse("spark.executor.heartbeatInterval", "10s"))
--- End diff --

Rather than repeat a default, just perform the check if both are set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156067549
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
--- End diff --

seems we only need it for sub-expr of whole stage codegen?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156066515
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
--- End diff --

why do we need to add sub-expr to parameter list? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156066138
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val state = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = state.value, isNull = state.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+

[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156065731
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
--- End diff --

nit: I feel it's more readable to first handle sub-expr, then input var, 
finally input row.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19942: [SPARK-22754][DEPLOY] Check whether spark.executor.heart...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19942
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156064910
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val state = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = state.value, isNull = state.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+

[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...

2017-12-11 Thread caneGuy
GitHub user caneGuy opened a pull request:

https://github.com/apache/spark/pull/19942

[SPARK-22754][DEPLOY] Check whether spark.executor.heartbeatInterval 
bigger…

… than spark.network.timeout or not

## What changes were proposed in this pull request?

If spark.executor.heartbeatInterval bigger than spark.network.timeout,it 
will almost always cause exception below.
`Job aborted due to stage failure: Task 4763 in stage 3.0 failed 4 times, 
most recent failure: Lost task 4763.3 in stage 3.0 (TID 22383, executor id: 
4761, host: c3-hadoop-prc-st2966.bj): ExecutorLostFailure (executor 4761 exited 
caused by one of the running tasks) Reason: Executor heartbeat timed out after 
154022 ms`
Since many users do not get that point.He will set 
spark.executor.heartbeatInterval incorrectly.
This patch check this case when submit applications.

## How was this patch tested?
Test in cluster


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/caneGuy/spark zhoukang/check-heartbeat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19942.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19942


commit 891a092ac3a95f32cb2f9e1c215b1c8324c98971
Author: zhoukang 
Date:   2017-12-11T12:48:32Z

[SPARK][DEPLOY] Check whether spark.executor.heartbeatInterval bigger than 
spark.network.timeout or not




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156062977
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala
 ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Defines util methods used in expression code generation.
+ */
+object ExpressionCodegen {
+
+  // Type alias for a tuple representing the data type and nullable for an 
expression.
+  type ExprProperty = (DataType, Boolean)
+
+  /**
+   * Given an expression, returns the all necessary parameters to evaluate 
it, so the generated
+   * code of this expression can be split in a function.
+   * The 1st string in returned tuple is the parameter strings used to 
call the function.
+   * The 2nd string in returned tuple is the parameter strings used to 
declare the function.
+   *
+   * Returns `None` if it can't produce valid parameters.
+   *
+   * Params to include:
+   * 1. Evaluated columns referred by this, children or deferred 
expressions.
+   * 2. Rows referred by this, children or deferred expressions.
+   * 3. Eliminated subexpressions referred by children expressions.
+   */
+  def getExpressionInputParams(
+  ctx: CodegenContext,
+  expr: Expression): Option[(Seq[String], Seq[String])] = {
+val (inputAttrs, inputVars) = getInputVarsForChildren(ctx, expr)
+val paramsFromColumns = prepareFunctionParams(ctx, inputAttrs, 
inputVars)
+
+val subExprs = getSubExprInChildren(ctx, expr)
+val subExprCodes = getSubExprCodes(ctx, subExprs)
+val subAttrs = subExprs.map(e => (e.dataType, e.nullable))
+val paramsFromSubExprs = prepareFunctionParams(ctx, subAttrs, 
subExprCodes)
+
+val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr)
+val paramsFromRows = inputRows.distinct.filter(_ != null).map { row =>
+  (row, s"InternalRow $row")
+}
+
+val paramsLength = getParamLength(ctx, inputAttrs ++ subAttrs) + 
paramsFromRows.length
+// Maximum allowed parameter number for Java's method descriptor.
+if (paramsLength > 255) {
+  None
+} else {
+  val allParams = (paramsFromRows ++ paramsFromColumns ++ 
paramsFromSubExprs).unzip
+  val callParams = allParams._1.distinct
+  val declParams = allParams._2.distinct
+  Some((callParams, declParams))
+}
+  }
+
+  /**
+   * Returns the eliminated subexpressions in the children expressions.
+   */
+  def getSubExprInChildren(ctx: CodegenContext, expr: Expression): 
Seq[Expression] = {
+expr.children.flatMap { child =>
+  child.collect {
+case e if ctx.subExprEliminationExprs.contains(e) => e
+  }
+}.distinct
+  }
+
+  /**
+   * A small helper function to return `ExprCode`s that represent 
subexpressions.
+   */
+  def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): 
Seq[ExprCode] = {
+subExprs.map { subExpr =>
+  val state = ctx.subExprEliminationExprs(subExpr)
+  ExprCode(code = "", value = state.value, isNull = state.isNull)
+}
+  }
+
+  /**
+   * Retrieves previous input rows referred by children and deferred 
expressions.
+   */
+  def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): 
Seq[String] = {
+expr.children.flatMap(getInputRows(ctx, _)).distinct
+  }
+
+  /**
+   * Given a child expression, retrieves previous input rows referred by 
it or deferred expressions
+   * which are needed to evaluate it.
+   */
+  def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = {
+

[GitHub] spark pull request #19676: [SPARK-14516][FOLLOWUP] Adding ClusteringEvaluato...

2017-12-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19676


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19893: [SPARK-16139][TEST] Add logging functionality for leaked...

2017-12-11 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/19893
  
The `TestHiveContext` is shared among test suites, maybe it's not a good 
time to change this for now. Could we create a new class that inherit from 
`SparkFunSuite` that examines leaking threads, and only have test suites in 
core module extend from this new class? WDYT @gatorsmile @cloud-fan ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19676: [SPARK-14516][FOLLOWUP] Adding ClusteringEvaluator to ex...

2017-12-11 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19676
  
Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-12-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19193#discussion_r156060138
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1920,7 +1927,34 @@ class Analyzer(
 
   case p: LogicalPlan if !p.childrenResolved => p
 
-  // Aggregate without Having clause.
+  // Extract window expressions from aggregate functions. There might 
be an aggregate whose
+  // aggregate function contains a window expression as a child, which 
we need to extract.
+  // e.g., df.groupBy().agg(max(rank().over(window))
+  case a @ Aggregate(groupingExprs, aggregateExprs, child)
+if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
+   a.expressions.forall(_.resolved) =>
+
+val windowExprAliases = new ArrayBuffer[NamedExpression]()
+val newAggregateExprs = aggregateExprs.map { expr =>
+  expr.transform {
--- End diff --

The code below assumes that there are no window aggregates on top of a 
regular aggregate, and it will push the regular aggregate into the underlying 
window. An example of this:
`df.groupBy(a).agg(max(rank().over(window1)), sum(sum(c)).over(window2))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19919: [SPARK-22727] spark.executor.instances's default value s...

2017-12-11 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19919
  
If the default is correctly described as 2, then I think there is nothing 
to do here and this should be closed. This change will cause other problems, at 
least.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-12-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19193#discussion_r156059425
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1920,7 +1927,34 @@ class Analyzer(
 
   case p: LogicalPlan if !p.childrenResolved => p
 
-  // Aggregate without Having clause.
+  // Extract window expressions from aggregate functions. There might 
be an aggregate whose
+  // aggregate function contains a window expression as a child, which 
we need to extract.
+  // e.g., df.groupBy().agg(max(rank().over(window))
+  case a @ Aggregate(groupingExprs, aggregateExprs, child)
--- End diff --

Can you add a test case for this scenario?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-12-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19193#discussion_r156059043
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1920,7 +1927,34 @@ class Analyzer(
 
   case p: LogicalPlan if !p.childrenResolved => p
 
-  // Aggregate without Having clause.
+  // Extract window expressions from aggregate functions. There might 
be an aggregate whose
+  // aggregate function contains a window expression as a child, which 
we need to extract.
+  // e.g., df.groupBy().agg(max(rank().over(window))
+  case a @ Aggregate(groupingExprs, aggregateExprs, child)
--- End diff --

To be sure: What happens if there is a window function on-top of the 
aggregate function? This gets resolved in two passes right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19941
  
**[Test build #84707 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84707/testReport)**
 for PR 19941 at commit 
[`1481163`](https://github.com/apache/spark/commit/14811636692810809033bc7caf03fcecb6939aa3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when w...

2017-12-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/19193#discussion_r156058341
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1920,7 +1927,34 @@ class Analyzer(
 
   case p: LogicalPlan if !p.childrenResolved => p
 
-  // Aggregate without Having clause.
+  // Extract window expressions from aggregate functions. There might 
be an aggregate whose
+  // aggregate function contains a window expression as a child, which 
we need to extract.
+  // e.g., df.groupBy().agg(max(rank().over(window))
+  case a @ Aggregate(groupingExprs, aggregateExprs, child)
+if containsAggregateFunctionWithWindowExpression(aggregateExprs) &&
+   a.expressions.forall(_.resolved) =>
+
+val windowExprAliases = new ArrayBuffer[NamedExpression]()
+val newAggregateExprs = aggregateExprs.map { expr =>
+  expr.transform {
+case aggExpr @ AggregateExpression(func, _, _, _) if 
hasWindowFunction(func.children) =>
+  val newFuncChildren = func.children.map { funcExpr =>
+funcExpr.transform {
+  case we: WindowExpression =>
+// Replace window expressions with aliases to them
+val windowExprAlias = Alias(we, 
s"_we${windowExprAliases.length}")()
+windowExprAliases += windowExprAlias
+windowExprAlias.toAttribute
+}
+  }
+  val newFunc = 
func.withNewChildren(newFuncChildren).asInstanceOf[AggregateFunction]
+  aggExpr.copy(aggregateFunction = newFunc)
+  }.asInstanceOf[NamedExpression]
+}
+val window = addWindow(windowExprAliases, child)
+// TODO do we also need a projection here?
+Aggregate(groupingExprs, newAggregateExprs, window)
--- End diff --

No you don't need a Project.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156057964
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -105,6 +105,11 @@ abstract class Expression extends TreeNode[Expression] 
{
   val isNull = ctx.freshName("isNull")
   val value = ctx.freshName("value")
   val eval = doGenCode(ctx, ExprCode("", isNull, value))
+
+  // Records current input row and variables of this expression.
+  eval.inputRow = ctx.INPUT_ROW
+  eval.inputVars = findInputVars(ctx, eval)
--- End diff --

actually we can be more aggresive and do `val isNull = if (this.nullable) 
eval.isNull else "false"`, and see if there is any compile errors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19940#discussion_r156057818
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -184,6 +190,27 @@ class CodegenContext {
 mutableStates += ((javaType, variableName, initCode))
   }
 
+  /**
+   * Add a mutable state as a field to the generated class only if it does 
not exist yet a field
+   * with that name. This helps reducing the number of the generated 
class' fields, since the same
+   * variable can be reused by many functions.
+   *
+   * Internally, this method calls `addMutableState`.
+   *
+   * @param javaType Java type of the field.
+   * @param variableName Name of the field.
+   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   */
+  def addSingleMutableState(
+  javaType: String,
+  variableName: String,
+  initCode: String = ""): Unit = {
+if (!singleMutableStates.contains(variableName)) {
+  addMutableState(javaType, variableName, initCode)
--- End diff --

if you want, I can add it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAnd...

2017-12-11 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/19941

[SPARK-22753][SQL] Get rid of dataSource.writeAndRead

## What changes were proposed in this pull request?

As the discussion in https://github.com/apache/spark/pull/16481 and 
https://github.com/apache/spark/pull/18975#discussion_r155454606
Currently the BaseRelation returned by `dataSource.writeAndRead` only used 
in `CreateDataSourceTableAsSelect`, planForWriting and writeAndRead has some 
common code paths. 
In this patch I removed the writeAndRead function and added the getRelation 
function which only use in `CreateDataSourceTableAsSelectCommand` while saving 
data to non-existing table.

## How was this patch tested?

Existing UT


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-22753

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19941.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19941


commit 14811636692810809033bc7caf03fcecb6939aa3
Author: Yuanjian Li 
Date:   2017-12-11T12:12:45Z

Get rid of dataSource.writeAndRead




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19805: [SPARK-22649][PYTHON][SQL] Adding localCheckpoint to Dat...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19805
  
**[Test build #84706 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84706/testReport)**
 for PR 19805 at commit 
[`9beb375`](https://github.com/apache/spark/commit/9beb375d15d162519856d32d6bd12e3cf1860d68).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156057427
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1001,16 +1039,25 @@ class CodegenContext {
 commonExprs.foreach { e =>
   val expr = e.head
   val fnName = freshName("evalExpr")
-  val isNull = s"${fnName}IsNull"
+  val isNull = if (expr.nullable) {
+s"${fnName}IsNull"
+  } else {
+""
+  }
   val value = s"${fnName}Value"
 
   // Generate the code for this expression tree and wrap it in a 
function.
   val eval = expr.genCode(this)
+  val nullValue = if (expr.nullable) {
--- End diff --

nit: `assignIsNull`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19916
  
@cloud-fan thanks for your answer. I don't think that something like:
```
((MyClass) reference[3] /* variableName */).doSomething();
```
would be readable. But it is just my opinion. Do you want me to add a local 
variable to generate more readable code? Or if you think that adding the 
comment is the right thing to do, I can do that.
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156056362
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -105,6 +105,11 @@ abstract class Expression extends TreeNode[Expression] 
{
   val isNull = ctx.freshName("isNull")
   val value = ctx.freshName("value")
   val eval = doGenCode(ctx, ExprCode("", isNull, value))
+
+  // Records current input row and variables of this expression.
+  eval.inputRow = ctx.INPUT_ROW
+  eval.inputVars = findInputVars(ctx, eval)
--- End diff --

shall we do one more thing here?
```
eval.isNull = if (this.nullable) eval.isNull else "false"
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19813#discussion_r156055243
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -55,8 +55,42 @@ import org.apache.spark.util.{ParentClassLoader, Utils}
  * to null.
  * @param value A term for a (possibly primitive) value of the result of 
the evaluation. Not
  *  valid if `isNull` is set to `true`.
+ * @param inputRow A term that holds the input row name when generating 
this code.
+ * @param inputVars A list of [[ExprInputVar]] that holds input variables 
when generating this code.
  */
-case class ExprCode(var code: String, var isNull: String, var value: 
String)
+case class ExprCode(
+var code: String,
+var isNull: String,
+var value: String,
+var inputRow: String = null,
+var inputVars: Seq[ExprInputVar] = Seq.empty) {
--- End diff --

do we have a requirement that only one of them can be not null/Nil?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19940#discussion_r156054632
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -184,6 +190,27 @@ class CodegenContext {
 mutableStates += ((javaType, variableName, initCode))
   }
 
+  /**
+   * Add a mutable state as a field to the generated class only if it does 
not exist yet a field
+   * with that name. This helps reducing the number of the generated 
class' fields, since the same
+   * variable can be reused by many functions.
+   *
+   * Internally, this method calls `addMutableState`.
+   *
+   * @param javaType Java type of the field.
+   * @param variableName Name of the field.
+   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   */
+  def addSingleMutableState(
+  javaType: String,
+  variableName: String,
+  initCode: String = ""): Unit = {
+if (!singleMutableStates.contains(variableName)) {
+  addMutableState(javaType, variableName, initCode)
--- End diff --

shall we add an assert here to make sure `initCode` is same with the 
previous one?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19916: [SPARK-22716][SQL] Replace addReferenceObj to reduce the...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19916
  
In a real application I think the difference is much smaller than 2.4%. So 
I think it's ok to remove `addReferenceObj`.

One problem is readability of the generated code, may be we can generate
`(MyClass) reference[3] /* variableNane */`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19937
  
**[Test build #84705 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84705/testReport)**
 for PR 19937 at commit 
[`9faf0a2`](https://github.com/apache/spark/commit/9faf0a2644739e9e19968c5077d6b14011aab9dd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/14151
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84704/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/14151
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/14151
  
**[Test build #84704 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84704/testReport)**
 for PR 14151 at commit 
[`66d5b45`](https://github.com/apache/spark/commit/66d5b453cd2aaaea08a3843f4966fc9036451b6c).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class WholeTextFileSuite extends QueryTest with SharedSQLContext `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19193: [WIP][SPARK-21896][SQL] Fix Stack Overflow when window f...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19193
  
cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19940#discussion_r156052035
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -184,6 +190,27 @@ class CodegenContext {
 mutableStates += ((javaType, variableName, initCode))
   }
 
+  /**
+   * Add a mutable state as a field to the generated class only if it does 
not exist yet a field
+   * with that name. This helps reducing the number of the generated 
class' fields, since the same
+   * variable can be reused by many functions.
+   *
+   * Internally, this method calls `addMutableState`.
+   *
+   * @param javaType Java type of the field.
+   * @param variableName Name of the field.
+   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   */
+  def addSingleMutableState(
+  javaType: String,
+  variableName: String,
+  initCode: String = ""): Unit = {
--- End diff --

It is not supported.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19940: [SPARK-22750][SQL] Reuse mutable states when poss...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19940#discussion_r156051417
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -184,6 +190,27 @@ class CodegenContext {
 mutableStates += ((javaType, variableName, initCode))
   }
 
+  /**
+   * Add a mutable state as a field to the generated class only if it does 
not exist yet a field
+   * with that name. This helps reducing the number of the generated 
class' fields, since the same
+   * variable can be reused by many functions.
+   *
+   * Internally, this method calls `addMutableState`.
+   *
+   * @param javaType Java type of the field.
+   * @param variableName Name of the field.
+   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   */
+  def addSingleMutableState(
+  javaType: String,
+  variableName: String,
+  initCode: String = ""): Unit = {
--- End diff --

How can we support different initCode for the same name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19937
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19937
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84703/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19937
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19937
  
**[Test build #84703 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84703/testReport)**
 for PR 19937 at commit 
[`9faf0a2`](https://github.com/apache/spark/commit/9faf0a2644739e9e19968c5077d6b14011aab9dd).
 * This patch **fails SparkR unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19940
  
@viirya we have seen that using arrays affects performance. Thus if we can 
reduce their usage it is better.

I don't think that debugging is harder. These variables I made shared are 
never assigned, but in the initialization. Do you have an other opinion? Or are 
you thinking for something specific?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r156037616
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -48,9 +48,26 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
 }.isDefined
   }
 
+  private def isPandasGroupAggUdf(expr: Expression): Boolean = expr match {
+  case _ @ PythonUDF(_, _, _, _, 
PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF ) => true
+  case Alias(expr, _) => isPandasGroupAggUdf(expr)
+  case _ => false
+  }
+
+  private def hasPandasGroupAggUdf(agg: Aggregate): Boolean = {
+val actualAggExpr = 
agg.aggregateExpressions.drop(agg.groupingExpressions.length)
+actualAggExpr.exists(isPandasGroupAggUdf)
+  }
+
+
   private def extract(agg: Aggregate): LogicalPlan = {
 val projList = new ArrayBuffer[NamedExpression]()
 val aggExpr = new ArrayBuffer[NamedExpression]()
+
+if (hasPandasGroupAggUdf(agg)) {
+  Aggregate(agg.groupingExpressions, agg.aggregateExpressions, 
agg.child)
+} else {
+
--- End diff --

nit: style, we need indent for this block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r156028776
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 ---
@@ -32,7 +31,5 @@ case class PythonUDF(
 evalType: Int)
   extends Expression with Unevaluable with NonSQLExpression with 
UserDefinedExpression {
 
-  override def toString: String = s"$name(${children.mkString(", ")})"
--- End diff --

Why was this removed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r156034167
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeSet, Expression, JoinedRow, NamedExpression, PythonUDF, SortOrder, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.Utils
+
+case class AggregateInPandasExec(
+groupingExpressions: Seq[Expression],
+udfExpressions: Seq[PythonUDF],
+resultExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def output: Seq[Attribute] = 
resultExpressions.map(_.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+if (groupingExpressions.isEmpty) {
+  AllTuples :: Nil
+} else {
+  ClusteredDistribution(groupingExpressions) :: Nil
+}
+  }
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+udf.children match {
+  case Seq(u: PythonUDF) =>
+val (chained, children) = collectFunctions(u)
+(ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+  case children =>
+// There should not be any other UDFs, or the children can't be 
evaluated directly.
+assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+(ChainedPythonFunctions(Seq(udf.func)), udf.children)
+}
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingExpressions.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val sessionLocalTimeZone = conf.sessionLocalTimeZone
+val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+
+val (pyFuncs, inputs) = udfExpressions.map(collectFunctions).unzip
+
+val allInputs = new ArrayBuffer[Expression]
+val dataTypes = new ArrayBuffer[DataType]
+
+allInputs.appendAll(groupingExpressions)
+
+val argOffsets = inputs.map { input =>
+  input.map { e =>
+  allInputs += e
+  dataTypes += e.dataType
+  allInputs.length - 1 - groupingExpressions.length
+  }.toArray
+}.toArray
+
+val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
+  StructField(s"_$i", dt)
+})
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = if (groupingExpressions.isEmpty) {
+Iterator((null, iter))
+  } else {
+val groupedIter = GroupedIterator(iter, groupingExpressions, 
child.output)
+
+val dropGrouping =
+  
UnsafeProjection.create(allInputs.drop(groupingExpressions.length), 
child.output)
+
+

[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r156037157
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -48,9 +48,26 @@ object ExtractPythonUDFFromAggregate extends 
Rule[LogicalPlan] {
 }.isDefined
   }
 
+  private def isPandasGroupAggUdf(expr: Expression): Boolean = expr match {
+  case _ @ PythonUDF(_, _, _, _, 
PythonEvalType.SQL_PANDAS_GROUP_AGG_UDF ) => true
--- End diff --

We don't need `_ @` here.
nit: remove extra space after `SQL_PANDAS_GROUP_AGG_UDF`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r156031375
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4016,6 +4016,124 @@ def test_unsupported_types(self):
 with self.assertRaisesRegexp(Exception, 'Unsupported data 
type'):
 df.groupby('id').apply(f).collect()
 
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyAggTests(ReusedSQLTestCase):
+def assertFramesEqual(self, expected, result):
--- End diff --

nit: how about making this the common method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19872: WIP: [SPARK-22274][PySpark] User-defined aggregat...

2017-12-11 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19872#discussion_r156038036
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 ---
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.execution.python
+package org.apache.spark.sql.catalyst.expressions
--- End diff --

Do we need to move package to catalyst?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19813
  
ping @cloud-fan Previous comments are all addressed. Please review this 
again. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19940
  
A high level question is, do we need to share mutable status, if we can 
compact global variables into array later?

Will sharing mutable status increase the difficulty of debugging codegen in 
the future?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19929: [SPARK-22629][PYTHON] Add deterministic flag to pyspark ...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19929
  
@gatorsmile sorry, I saw that you did the path for scala UDF. Might you 
help reviewing this please? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19940: [SPARK-22750][SQL] Reuse mutable states when possible

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19940
  
@cloud-fan @kiszk @viirya might you please help reviewing this? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19811
  
@kiszk I see, thanks. Wouldn't be better anyway to store all the 
initialization in the same array so that we ensure that the ordering is the 
same as before this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19938: [SPARK-22747][SQL] Localize lifetime of mutable states i...

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19938
  
Good point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19893: [SPARK-16139][TEST] Add logging functionality for leaked...

2017-12-11 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/19893
  
I've analysed the hive related test flow and found SparkSession and 
SQLContext sharing between suites as you mentioned. Here is the execution flow:

1. The first hive test suite instantiates TestHive which creates 
SparkSession and SQLContext
2. SparkFunSuite.beforeAll creates a thread snapshot
3. Test code runs
4. TestHiveSingleton.afterAll resets SparkSession
5. SparkFunSuite.afterAll prints out the possible leaks

Step one executed only by the first hive suite and never again.

Here I do not see false positives in big scale. The only possible false 
positive threads what I foresee could come from lazy initialisation within 
SparkSession or SQLContext. On the leftover side we're not tracking 
SparkSession and SQLContext threads but because of the singleton nature my 
suggestion is to leave it like that.

In this case you mentioned
```
$ grep 'POSSIBLE THREAD LEAK' unit-tests.log  | wc -l
158
```
I can imagine the following situations:
1. Test doesn't call hiveContext.reset()
2. Test creates thread but not frees up
3. Production code issue
4. ...
Of course there could be other issues which I've not considered, please 
share your ideas.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...

2017-12-11 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19811
  
Good question, got it. There are some cases that we have to ensure order of 
declarations.  
The current code asks developers to ensure it by using `inline = true`. We 
can see [an 
example](https://github.com/apache/spark/pull/19811/files#diff-90b107e5c61791e17d5b4b25021b89fdR323)
 .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19811
  
@kiszk yes, I know, but I meant a different thing. I will try to explain 
with an example, let me know if I am not clear enough.

Since we are not defining everything as an array and we are initializing 
everything which doesn't go in an array before everything which goes into an 
array, currently, the following situation can happen:
 - we define a variable (`v1`) which goes into an array;
 - then we define a variable (`v2DependsOnV1`) which doesn't go into any 
array, but which relies on the previous variable;
Before this PR, we were ensuring that the order of the initialization was 
the same of the declarations, then it would have been:
```
 // here we init v1
 // here we init v2DependsOnV1
```
After the PR, in the above condition, we don't maintain the same order and 
we would have:
```
 // here we init v2DependsOnV1
 // here we init v1
```
which is a wrong situation.

Maybe this is a condition which never happens, ie. we don't have variables 
which depends on others during the initialization, thus maybe I am worried for 
a situation which never happens, but this is my question: are we sure this is 
not a problem?

Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14151: [SPARK-16496][SQL] Add wholetext as option for reading t...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/14151
  
**[Test build #84704 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84704/testReport)**
 for PR 14151 at commit 
[`66d5b45`](https://github.com/apache/spark/commit/66d5b453cd2aaaea08a3843f4966fc9036451b6c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...

2017-12-11 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19811
  
@mgaido91 The latest code does not generate a loop for initializations. 
This is an optimization to reduce the JVM bytecode size. 
As @cloud-fan pointed out 
[here](https://github.com/apache/spark/pull/19811#issuecomment-348144586), it 
should be addressed by another PR since it is another story.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19911: [SPARK-22729][SQL] Add getTruncateQuery to JdbcDialect

2017-12-11 Thread danielvdende
Github user danielvdende commented on the issue:

https://github.com/apache/spark/pull/19911
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19715: [SPARK-22397][ML]add multiple columns support to ...

2017-12-11 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19715#discussion_r156010806
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala ---
@@ -129,34 +156,102 @@ final class QuantileDiscretizer @Since("1.6.0") 
(@Since("1.6.0") override val ui
   @Since("2.1.0")
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
 
+  /** @group setParam */
+  @Since("2.3.0")
+  def setNumBucketsArray(value: Array[Int]): this.type = 
set(numBucketsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  private[feature] def isQuantileDiscretizeMultipleColumns(): Boolean = {
+if (isSet(inputCols) && isSet(inputCol)) {
+  logWarning("Both `inputCol` and `inputCols` are set, we ignore 
`inputCols` and this " +
--- End diff --

According to the discussion result at JIRA SPARK-8418, we should throw 
exception when both inputCol and inputCols are specified ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19937
  
LGTM too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19811
  
hi @kiszk, have you already checked why there are python failures? I wanted 
to share a concern I have with the current implementation. Maybe, it is not a 
problem, but I think that currently we are changing the order in which we are 
initializing variables, since we are initializing the arrays after the simple 
fields and this might not be the original order. Do you think this might be a 
problem?
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19937
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19937#discussion_r156010028
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -513,26 +513,28 @@ case class SortMergeJoinExec(
* the variables should be declared separately from accessing the 
columns, we can't use the
* codegen of BoundReference here.
*/
-  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
Seq[ExprCode] = {
+  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
(Seq[ExprCode], Seq[String]) = {
 ctx.INPUT_ROW = leftRow
 left.output.zipWithIndex.map { case (a, i) =>
   val value = ctx.freshName("value")
   val valueCode = ctx.getValue(leftRow, a.dataType, i.toString)
-  // declare it as class member, so we can access the column before or 
in the loop.
-  ctx.addMutableState(ctx.javaType(a.dataType), value)
   if (a.nullable) {
 val isNull = ctx.freshName("isNull")
-ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
 val code =
   s"""
  |$isNull = $leftRow.isNullAt($i);
  |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : 
($valueCode);
""".stripMargin
-ExprCode(code, isNull, value)
+(ExprCode(code, isNull, value),
--- End diff --

Yea, I see, I just ran the test and found it too. Looks like puts the 
declaration at top is simplest.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19937: [SPARK-22746][SQL] Avoid the generation of useless mutab...

2017-12-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19937
  
**[Test build #84703 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84703/testReport)**
 for PR 19937 at commit 
[`9faf0a2`](https://github.com/apache/spark/commit/9faf0a2644739e9e19968c5077d6b14011aab9dd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...

2017-12-11 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19937#discussion_r156007853
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -513,26 +513,28 @@ case class SortMergeJoinExec(
* the variables should be declared separately from accessing the 
columns, we can't use the
* codegen of BoundReference here.
*/
-  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
Seq[ExprCode] = {
+  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
(Seq[ExprCode], Seq[String]) = {
 ctx.INPUT_ROW = leftRow
 left.output.zipWithIndex.map { case (a, i) =>
   val value = ctx.freshName("value")
   val valueCode = ctx.getValue(leftRow, a.dataType, i.toString)
-  // declare it as class member, so we can access the column before or 
in the loop.
-  ctx.addMutableState(ctx.javaType(a.dataType), value)
   if (a.nullable) {
 val isNull = ctx.freshName("isNull")
-ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
 val code =
   s"""
  |$isNull = $leftRow.isNullAt($i);
  |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : 
($valueCode);
""".stripMargin
-ExprCode(code, isNull, value)
+(ExprCode(code, isNull, value),
--- End diff --

Good question. If we declare variable types only at `code`, it may lead to 
compilation error in`janino`.  

The `code` that you pointed out may go to `$leftAfter` in `$condCheck` that 
exists in a `if-then` block at the inner most of the loop in the generated 
code, by using `leftVar`. The variable in `leftVars` is also referred to at 
`${consume(ctx, leftVars ++ rightVars)}`.

In the following example, If we declare variable types only at `code`, we 
will drop lines 145 and will declare `int` for `smj_value8` at lines 162. Since 
`smj_value8` is refered at line 169, the generated code would cause compilation 
error.

WDYT?

```
/* 143 */   protected void processNext() throws java.io.IOException {
/* 144 */ while (findNextInnerJoinRows(smj_leftInput, smj_rightInput)) {
/* 145 */   int smj_value8 = -1;
/* 146 */   boolean smj_isNull6 = false;
/* 147 */   int smj_value9 = -1;
/* 148 */   boolean smj_loaded = false;
/* 149 */   smj_isNull6 = smj_leftRow.isNullAt(1);
/* 150 */   smj_value9 = smj_isNull6 ? -1 : (smj_leftRow.getInt(1));
/* 151 */   scala.collection.Iterator smj_iterator = 
smj_matches.generateIterator();
/* 152 */   while (smj_iterator.hasNext()) {
...
/* 160 */ if (!smj_loaded) {
/* 161 */   smj_loaded = true;
/* 162 */   smj_value8 = smj_leftRow.getInt(0);
/* 163 */ }
/* 164 */ int smj_value10 = smj_rightRow1.getInt(0);
/* 165 */ smj_numOutputRows.add(1);
/* 166 */
/* 167 */ smj_rowWriter.zeroOutNullBytes();
/* 168 */
/* 169 */ smj_rowWriter.write(0, smj_value8);
...
/* 185 */
/* 186 */   }
/* 187 */   if (shouldStop()) return;
/* 188 */ }
/* 189 */   }
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19937#discussion_r156005276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -513,26 +513,28 @@ case class SortMergeJoinExec(
* the variables should be declared separately from accessing the 
columns, we can't use the
* codegen of BoundReference here.
*/
-  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
Seq[ExprCode] = {
+  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
(Seq[ExprCode], Seq[String]) = {
 ctx.INPUT_ROW = leftRow
 left.output.zipWithIndex.map { case (a, i) =>
   val value = ctx.freshName("value")
   val valueCode = ctx.getValue(leftRow, a.dataType, i.toString)
-  // declare it as class member, so we can access the column before or 
in the loop.
-  ctx.addMutableState(ctx.javaType(a.dataType), value)
   if (a.nullable) {
 val isNull = ctx.freshName("isNull")
-ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
 val code =
   s"""
  |$isNull = $leftRow.isNullAt($i);
  |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : 
($valueCode);
""".stripMargin
-ExprCode(code, isNull, value)
+(ExprCode(code, isNull, value),
--- End diff --

good catch! Makes sense to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...

2017-12-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19937#discussion_r156005086
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -617,6 +619,7 @@ case class SortMergeJoinExec(
 
 s"""
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
+   |  ${leftVarDecl.mkString("\n")}
--- End diff --

@kiszk thanks for the test. Then I agree this is the best option, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19937: [SPARK-22746][SQL] Avoid the generation of useles...

2017-12-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19937#discussion_r156004320
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -513,26 +513,28 @@ case class SortMergeJoinExec(
* the variables should be declared separately from accessing the 
columns, we can't use the
* codegen of BoundReference here.
*/
-  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
Seq[ExprCode] = {
+  private def createLeftVars(ctx: CodegenContext, leftRow: String): 
(Seq[ExprCode], Seq[String]) = {
 ctx.INPUT_ROW = leftRow
 left.output.zipWithIndex.map { case (a, i) =>
   val value = ctx.freshName("value")
   val valueCode = ctx.getValue(leftRow, a.dataType, i.toString)
-  // declare it as class member, so we can access the column before or 
in the loop.
-  ctx.addMutableState(ctx.javaType(a.dataType), value)
   if (a.nullable) {
 val isNull = ctx.freshName("isNull")
-ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull)
 val code =
   s"""
  |$isNull = $leftRow.isNullAt($i);
  |$value = $isNull ? ${ctx.defaultValue(a.dataType)} : 
($valueCode);
""".stripMargin
-ExprCode(code, isNull, value)
+(ExprCode(code, isNull, value),
--- End diff --

Why separate variable declaration and code to two parts? Previously it is 
because it uses global variables. Now since we use local variables, I think we 
can simply do:

```scala
 val code =
   s"""
  |boolean $isNull = $leftRow.isNullAt($i);
  |${ctx.javaType(a.dataType)} $value = $isNull ? 
${ctx.defaultValue(a.dataType)} : ($valueCode);
""".stripMargin
```

Don't we?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19811: [SPARK-18016][SQL] Code Generation: Constant Pool...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19811#discussion_r156003506
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -177,11 +189,64 @@ class CodegenContext {
* the list of default imports available.
* Also, generic type arguments are accepted but ignored.
* @param variableName Name of the field.
-   * @param initCode The statement(s) to put into the init() method to 
initialize this field.
+   * @param codeFunctions Function includes statement(s) to put into the 
init() method to
+   * initialize this field. An argument is the name of the 
mutable state variable
* If left blank, the field will be default-initialized.
+   * @param inline whether the declaration and initialization code may be 
inlined rather than
+   *   compacted.
+   * @param useFreshName If false and inline is true, the name is not 
changed
+   * @return the name of the mutable state variable, which is either the 
original name if the
+   * variable is inlined to the outer class, or an array access if 
the variable is to be
+   * stored in an array of variables of the same type and 
initialization.
+   * There are two use cases. One is to use the original name for 
global variable instead
+   * of fresh name. Second is to use the original initialization 
statement since it is
+   * complex (e.g. allocate multi-dimensional array or object 
constructor has varibles).
+   * Primitive type variables will be inlined into outer class 
when the total number of
+   * mutable variables is less than 
`CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD`
+   * the max size of an array for compaction is given by
+   * `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
*/
-  def addMutableState(javaType: String, variableName: String, initCode: 
String = ""): Unit = {
-mutableStates += ((javaType, variableName, initCode))
+  def addMutableState(
+  javaType: String,
+  variableName: String,
+  codeFunctions: String => String = _ => "",
--- End diff --

`initFunc`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-12-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r156002166
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -805,26 +908,44 @@ case class HashAggregateExec(
 
 
 def updateRowInFastHashMap(isVectorized: Boolean): Option[String] = {
-  ctx.INPUT_ROW = fastRowBuffer
+  // We need to copy the aggregation row buffer to a local row first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localRowBuffer = ctx.freshName("localFastRowBuffer")
+  val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$fastRowBuffer.copy();"
+
+  ctx.INPUT_ROW = localRowBuffer
   val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
   val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
   val effectiveCodes = subExprs.codes.mkString("\n")
   val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
 boundUpdateExpr.map(_.genCode(ctx))
   }
-  val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
+
+  val evalAndUpdateCodes = fastRowEvals.zipWithIndex.map { case (ev, 
i) =>
 val dt = updateExpr(i).dataType
-ctx.updateColumn(fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorized)
+val updateColumnCode = ctx.updateColumn(
+  fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized)
+s"""
+   | // evaluate aggregate function
+   | ${ev.code}
+   | // update fast row
+   | $updateColumnCode
+ """.stripMargin
   }
+
+  val updateAggValCode = splitAggregateExpressions(
+ctx, boundUpdateExpr, evalAndUpdateCodes, subExprs.states,
+Seq(("InternalRow", fastRowBuffer)))
--- End diff --

indents


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-12-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r15593
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -1070,6 +1071,24 @@ class CodegenContext {
   }
 }
 
+object CodegenContext {
+
+  private val javaKeywords = Set(
+"abstract", "assert", "boolean", "break", "byte", "case", "catch", 
"char", "class", "const",
+"continue", "default", "do", "double", "else", "extends", "false", 
"final", "finally", "float",
+"for", "goto", "if", "implements", "import", "instanceof", "int", 
"interface", "long", "native",
+"new", "null", "package", "private", "protected", "public", "return", 
"short", "static",
+"strictfp", "super", "switch", "synchronized", "this", "throw", 
"throws", "transient", "true",
+"try", "void", "volatile", "while"
+  )
+
+  def isJavaIdentifier(str: String): Boolean = str match {
+case null | "" => false
+case _ => !javaKeywords.contains(str) && 
isJavaIdentifierStart(str.charAt(0)) &&
+  (1 until str.length).forall(i => isJavaIdentifierPart(str.charAt(i)))
+  }
--- End diff --

```Scala
  /**
   * Returns true if the given `str` is a valid java identifier.
   */
  def isJavaIdentifier(str: String): Boolean = str match {
case null | "" =>
  false
case _ =>
  !javaKeywords.contains(str) && isJavaIdentifierStart(str.charAt(0)) &&
(1 until str.length).forall(i => 
isJavaIdentifierPart(str.charAt(i)))
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-12-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r156003342
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -256,6 +258,85 @@ case class HashAggregateExec(
  """.stripMargin
   }
 
+  // Extracts all the input variable references for a given `aggExpr`. 
This result will be used
+  // to split aggregation into small functions.
+  private def getInputVariableReferences(
+  ctx: CodegenContext,
+  aggExpr: Expression,
+  subExprs: Map[Expression, SubExprEliminationState]): Set[(String, 
String)] = {
+// `argSet` collects all the pairs of variable names and their types, 
the first in the pair is
+// a type name and the second is a variable name.
+val argSet = mutable.Set[(String, String)]()
+val stack = mutable.Stack[Expression](aggExpr)
+while (stack.nonEmpty) {
+  stack.pop() match {
+case e if subExprs.contains(e) =>
+  val exprCode = subExprs(e)
+  if (CodegenContext.isJavaIdentifier(exprCode.value)) {
+argSet += ((ctx.javaType(e.dataType), exprCode.value))
+  }
+  if (CodegenContext.isJavaIdentifier(exprCode.isNull)) {
+argSet += (("boolean", exprCode.isNull))
+  }
+  // Since the children possibly has common expressions, we push 
them here
+  stack.pushAll(e.children)
+case ref: BoundReference
+if ctx.currentVars != null && ctx.currentVars(ref.ordinal) != 
null =>
+  val value = ctx.currentVars(ref.ordinal).value
+  val isNull = ctx.currentVars(ref.ordinal).isNull
+  if (CodegenContext.isJavaIdentifier(value)) {
+argSet += ((ctx.javaType(ref.dataType), value))
+  }
+  if (CodegenContext.isJavaIdentifier(isNull)) {
+argSet += (("boolean", isNull))
+  }
+case _: BoundReference =>
+  argSet += (("InternalRow", ctx.INPUT_ROW))
+case e =>
+  stack.pushAll(e.children)
+  }
+}
+
+argSet.toSet
+  }
+
+  // Splits aggregate code into small functions because JVMs does not 
compile too long functions
+  private def splitAggregateExpressions(
+  ctx: CodegenContext,
+  aggExprs: Seq[Expression],
+  evalAndUpdateCodes: Seq[String],
+  subExprs: Map[Expression, SubExprEliminationState],
+  otherArgs: Seq[(String, String)] = Seq.empty): Seq[String] = {
+aggExprs.zipWithIndex.map { case (aggExpr, i) =>
+  // The maximum length of parameters in non-static Java methods is 
254, but a parameter of
+  // type long or double contributes two units to the length. So, this 
method gives up
+  // splitting the code if the parameter length goes over 127.
+  val args = (getInputVariableReferences(ctx, aggExpr, subExprs) ++ 
otherArgs).toSeq
+
+  // This is for testing/benchmarking only
+  val maxParamNumInJavaMethod =
+  
sqlContext.getConf("spark.sql.codegen.aggregate.maxParamNumInJavaMethod", null) 
match {
--- End diff --

Let us introduce an internal SQLConf. If the number is high enough, we can 
disable this feature. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-12-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r156000426
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 ---
@@ -380,4 +380,19 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 s"Incorrect Evaluation: expressions: $exprAnd, actual: $actualAnd, 
expected: $expectedAnd")
 }
   }
+
+  test("SPARK-21870 check if CodegenContext.isJavaIdentifier works 
correctly") {
+assert(CodegenContext.isJavaIdentifier("agg_value") === true)
+assert(CodegenContext.isJavaIdentifier("agg_value1") === true)
+assert(CodegenContext.isJavaIdentifier("bhj_value4") === true)
+assert(CodegenContext.isJavaIdentifier("smj_value6") === true)
+assert(CodegenContext.isJavaIdentifier("rdd_value7") === true)
+assert(CodegenContext.isJavaIdentifier("scan_isNull") === true)
+assert(CodegenContext.isJavaIdentifier("test") === true)
+assert(CodegenContext.isJavaIdentifier("true") === false)
+assert(CodegenContext.isJavaIdentifier("false") === false)
+assert(CodegenContext.isJavaIdentifier("390239") === false)
+assert(CodegenContext.isJavaIdentifier(literal) === false)
+assert(CodegenContext.isJavaIdentifier(double) === false)
--- End diff --

```Scala
import CodegenContext.isJavaIdentifier
// positive cases
assert(isJavaIdentifier("agg_value"))
assert(isJavaIdentifier("agg_value1"))
assert(isJavaIdentifier("bhj_value4"))
assert(isJavaIdentifier("smj_value6"))
assert(isJavaIdentifier("rdd_value7"))
assert(isJavaIdentifier("scan_isNull"))
assert(isJavaIdentifier("test"))
// negative cases
assert(!isJavaIdentifier("true"))
assert(!isJavaIdentifier("false"))
assert(!isJavaIdentifier("390239"))
assert(!isJavaIdentifier(literal))
assert(!isJavaIdentifier(double))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19082: [SPARK-21870][SQL] Split aggregation code into sm...

2017-12-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19082#discussion_r156002134
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 ---
@@ -863,25 +984,43 @@ case class HashAggregateExec(
 }
 
 val updateRowInUnsafeRowMap: String = {
-  ctx.INPUT_ROW = unsafeRowBuffer
+  // We need to copy the aggregation row buffer to a local row first 
because each aggregate
+  // function directly updates the buffer when it finishes.
+  val localRowBuffer = ctx.freshName("localUnsafeRowBuffer")
+  val initLocalRowBuffer = s"InternalRow $localRowBuffer = 
$unsafeRowBuffer.copy();"
+
+  ctx.INPUT_ROW = localRowBuffer
   val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
   val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
   val effectiveCodes = subExprs.codes.mkString("\n")
   val unsafeRowBufferEvals = 
ctx.withSubExprEliminationExprs(subExprs.states) {
 boundUpdateExpr.map(_.genCode(ctx))
   }
-  val updateUnsafeRowBuffer = unsafeRowBufferEvals.zipWithIndex.map { 
case (ev, i) =>
+
+  val evalAndUpdateCodes = unsafeRowBufferEvals.zipWithIndex.map { 
case (ev, i) =>
 val dt = updateExpr(i).dataType
-ctx.updateColumn(unsafeRowBuffer, dt, i, ev, 
updateExpr(i).nullable)
+val updateColumnCode = ctx.updateColumn(unsafeRowBuffer, dt, i, 
ev, updateExpr(i).nullable)
+s"""
+   | // evaluate aggregate function
+   | ${ev.code}
+   | // update unsafe row buffer
+   | $updateColumnCode
+ """.stripMargin
   }
+
+  val updateAggValCode = splitAggregateExpressions(
+ctx, boundUpdateExpr, evalAndUpdateCodes, subExprs.states,
+Seq(("InternalRow", unsafeRowBuffer)))
--- End diff --

indents.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19938: [SPARK-22747][SQL] Localize lifetime of mutable states i...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19938
  
actually there is one real problem: after we fold many global variables 
into an array, the variable name may become something like `arr[1]`, which 
can't be used as the parameter name.

Localize the global variables in current expression/operator is one 
solution, another one is generating parameter names instead of reusing the 
input variable name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19939: [SPARK-20557] [SQL] Only support TIMESTAMP WITH TIME ZON...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19939
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19939: [SPARK-20557] [SQL] Only support TIMESTAMP WITH T...

2017-12-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19939#discussion_r156002148
  
--- Diff: 
external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 ---
@@ -235,6 +239,61 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
 assert(types(1).equals("class java.sql.Timestamp"))
   }
 
+  test("Column type TIMESTAMP with SESSION_LOCAL_TIMEZONE is different 
from default") {
+val defaultJVMTimeZone = TimeZone.getDefault
+// Pick the timezone different from the current default time zone of 
JVM
+val sofiaTimeZone = TimeZone.getTimeZone("Europe/Sofia")
+val shanghaiTimeZone = TimeZone.getTimeZone("Asia/Shanghai")
+val localSessionTimeZone =
+  if (defaultJVMTimeZone == shanghaiTimeZone) sofiaTimeZone else 
shanghaiTimeZone
+
+withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
localSessionTimeZone.getID) {
+  val e = intercept[java.sql.SQLException] {
+val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
+dfRead.collect()
+  }.getMessage
+  assert(e.contains("Unrecognized SQL type -101"))
+}
+  }
+
+  /**
+   * Change the Time Zone `timeZoneId` of JVM before executing `f`, then 
switches back to the
+   * original after `f` returns.
+   * @param timeZoneId the ID for a TimeZone, either an abbreviation such 
as "PST", a full name such
+   *   as "America/Los_Angeles", or a custom ID such as 
"GMT-8:00".
+   */
+  private def withTimeZone(timeZoneId: String)(f: => Unit): Unit = {
+val originalLocale = TimeZone.getDefault
+try {
+  // Add Locale setting
+  TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId))
+  f
+} finally {
+  TimeZone.setDefault(originalLocale)
+}
+  }
+
+  test("Column TIMESTAMP with TIME ZONE(JVM timezone)") {
+def checkRow(row: Row, ts: String): Unit = {
+  assert(row.getTimestamp(1).equals(Timestamp.valueOf(ts)))
+}
+
+val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new 
Properties)
+withTimeZone("PST") {
--- End diff --

I feel it's safer if we also set session local time zone in `withTimeZone`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5