[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19813 **[Test build #84756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84756/testReport)** for PR 19813 at commit [`1251dfa`](https://github.com/apache/spark/commit/1251dfa305f4f1f8e34d7deb235bfa500d057fb4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156291613 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- > another idea, let's move isLiteral and isEvaluated to ExpressionCodegen. Seems these 2 are not general definition of ExprCode, and only makes sense for ctx.currentVars. Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156291349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- or `isGloballyAccess`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156291204 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- It is doable. Instead of `isLiteral`, we may call it `isValidJavaVariable`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156291182 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- another idea, let's move `isLiteral` and `isEvaluated` to `ExpressionCodegen`. Seems these 2 are not general definition of `ExprCode`, and only makes sense for `ctx.currentVars`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156290690 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- can we do a regex check to make sure `value` is a valid java variable name? We can make an assumption here that if it's not a valid java variable name, it should be accessible globally, like `123`, or `arr[1]` while `arr` is a global variable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19948: [SPARK-19809][SQL][TEST] NullPointerException on zero-si...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19948 **[Test build #84755 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84755/testReport)** for PR 19948 at commit [`492a30f`](https://github.com/apache/spark/commit/492a30ff673372b04455cd3bb454701961c65760). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19948: [SPARK-19809][SQL][TEST] NullPointerException on zero-si...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19948 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156288913 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subVars = subExprs.zip(subExprCodes).map { case (subExpr, subExprCode) => + ExprInputVar(subExprCode, subExpr.dataType, subExpr.nullable) +} +val paramsFromSubExprs = prepareFunctionParams(ctx, subVars) + +val inputVars = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputVars) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputVars ++ subVars) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +child.flatMap { + // An expression directly evaluates on current input row. + case Bo
[GitHub] spark issue #19944: [SPARK-22756] [Build] [SparkR] Run SparkR tests if hive_...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19944 Hm, @gatorsmile. BTW, do you maybe know how CRAN check fails by the changes in the thrift server? I was just double checking for sure but it sounds orthogonal to me now. The test failure above seems due to missing package `e1071` in your local. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156288523 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- I only consider literals and normal java variables now. If we say array accessing `arr[1]` are all global variables (I suppose they are, as usually we don't/can't use `arr[1]` as variable name), I'm thinking maybe we should disallow passing any global variables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19811: [SPARK-18016][SQL] Code Generation: Constant Pool...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19811#discussion_r156287876 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala --- @@ -219,4 +219,55 @@ class GeneratedProjectionSuite extends SparkFunSuite { // - one is the mutableRow assert(globalVariables.length == 3) } + + test("SPARK-18016: generated projections on wider table requiring state compaction") { +val N = 6000 +val wideRow1 = new GenericInternalRow((0 until N).toArray[Any]) +val schema1 = StructType((1 to N).map(i => StructField("", IntegerType))) +val wideRow2 = new GenericInternalRow( + (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) +val schema2 = StructType((1 to N).map(i => StructField("", StringType))) +val joined = new JoinedRow(wideRow1, wideRow2) +val joinedSchema = StructType(schema1 ++ schema2) +val nested = new JoinedRow(InternalRow(joined, joined), joined) +val nestedSchema = StructType( + Seq(StructField("", joinedSchema), StructField("", joinedSchema)) ++ joinedSchema) + +// test generated UnsafeProjection +val unsafeProj = UnsafeProjection.create(nestedSchema) +val unsafe: UnsafeRow = unsafeProj(nested) +(0 until N).foreach { i => + val s = UTF8String.fromString(i.toString) + assert(i === unsafe.getInt(i + 2)) + assert(s === unsafe.getUTF8String(i + 2 + N)) + assert(i === unsafe.getStruct(0, N * 2).getInt(i)) + assert(s === unsafe.getStruct(0, N * 2).getUTF8String(i + N)) + assert(i === unsafe.getStruct(1, N * 2).getInt(i)) + assert(s === unsafe.getStruct(1, N * 2).getUTF8String(i + N)) +} + +// test generated SafeProjection +val safeProj = FromUnsafeProjection(nestedSchema) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19941 **[Test build #84754 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84754/testReport)** for PR 19941 at commit [`1481163`](https://github.com/apache/spark/commit/14811636692810809033bc7caf03fcecb6939aa3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19811: [SPARK-18016][SQL] Code Generation: Constant Pool...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19811#discussion_r156287813 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala --- @@ -219,4 +219,55 @@ class GeneratedProjectionSuite extends SparkFunSuite { // - one is the mutableRow assert(globalVariables.length == 3) } + + test("SPARK-18016: generated projections on wider table requiring state compaction") { +val N = 6000 +val wideRow1 = new GenericInternalRow((0 until N).toArray[Any]) +val schema1 = StructType((1 to N).map(i => StructField("", IntegerType))) +val wideRow2 = new GenericInternalRow( + (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) +val schema2 = StructType((1 to N).map(i => StructField("", StringType))) +val joined = new JoinedRow(wideRow1, wideRow2) +val joinedSchema = StructType(schema1 ++ schema2) +val nested = new JoinedRow(InternalRow(joined, joined), joined) +val nestedSchema = StructType( + Seq(StructField("", joinedSchema), StructField("", joinedSchema)) ++ joinedSchema) + +// test generated UnsafeProjection +val unsafeProj = UnsafeProjection.create(nestedSchema) --- End diff -- It seems that the state compaction never happens in this `UnsafeProjection`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156287780 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- also cc @kiszk @mgaido91 @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156287640 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- hmmm this seems very hard to do, the code is already generated and use the input names as whatever it is, e.g. java variable `a` or literal `123` or array accessing `arr[1]`. Ideally we need to analyze what the input really refers, e.g. `a` refers to a java variable `a`, `123` refers nothing, `arr[1]` refers to a java variable `arr`. This is kind of impossible for the current string based framework. We need to think more about how to deal with it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19941 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156287106 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1028,12 +1078,17 @@ class CodegenContext { // 2. Less code. // Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with // at least two nodes) as the cost of doing it is expected to be low. - addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;") - addMutableState(javaType(expr.dataType), value, -s"$value = ${defaultValue(expr.dataType)};") + if (expr.nullable) { +addMutableState(JAVA_BOOLEAN, isNull) + } + addMutableState(javaType(expr.dataType), value) subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);" - val state = SubExprEliminationState(isNull, value) + val state = if (expr.nullable) { +SubExprEliminationState(isNull, value) --- End diff -- Because here it is not `SubExprEliminationState(ev.isNull, value)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156287015 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1028,12 +1078,17 @@ class CodegenContext { // 2. Less code. // Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with // at least two nodes) as the cost of doing it is expected to be low. - addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;") - addMutableState(javaType(expr.dataType), value, -s"$value = ${defaultValue(expr.dataType)};") + if (expr.nullable) { +addMutableState(JAVA_BOOLEAN, isNull) + } + addMutableState(javaType(expr.dataType), value) subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);" - val state = SubExprEliminationState(isNull, value) + val state = if (expr.nullable) { +SubExprEliminationState(isNull, value) --- End diff -- I think this is still needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156286913 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -236,4 +237,24 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-22551: Fix 64kb limit for deeply nested expressions under wholestage codegen") { +import testImplicits._ +withTempPath { dir => + val path = dir.getCanonicalPath + val df = Seq(("abc", 1)).toDF("key", "int") + df.write.parquet(path) + + var strExpr: Expression = col("key").expr + for (_ <- 1 to 150) { +strExpr = Decode(Encode(strExpr, Literal("utf-8")), Literal("utf-8")) + } + val expressions = Seq(If(EqualTo(strExpr, strExpr), strExpr, strExpr)) + + val df2 = spark.read.parquet(path).select(expressions.map(Column(_)): _*) + val plan = df2.queryExecution.executedPlan + assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined) --- End diff -- can you give some insights about how this test fail without your PR? In `WholeStageCodegen.doExec`, we have ``` val (ctx, cleanedSource) = doCodeGen() // try to compile and fallback if it failed val (_, maxCodeSize) = try { CodeGenerator.compile(cleanedSource) } catch { case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString") return child.execute() } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156286336 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types.DataType + +/** + * Defines util methods used in expression code generation. + */ +object ExpressionCodegen { + + /** + * Given an expression, returns the all necessary parameters to evaluate it, so the generated + * code of this expression can be split in a function. + * The 1st string in returned tuple is the parameter strings used to call the function. + * The 2nd string in returned tuple is the parameter strings used to declare the function. + * + * Returns `None` if it can't produce valid parameters. + * + * Params to include: + * 1. Evaluated columns referred by this, children or deferred expressions. + * 2. Rows referred by this, children or deferred expressions. + * 3. Eliminated subexpressions referred by children expressions. + */ + def getExpressionInputParams( + ctx: CodegenContext, + expr: Expression): Option[(Seq[String], Seq[String])] = { +val subExprs = getSubExprInChildren(ctx, expr) +val subExprCodes = getSubExprCodes(ctx, subExprs) +val subVars = subExprs.zip(subExprCodes).map { case (subExpr, subExprCode) => + ExprInputVar(subExprCode, subExpr.dataType, subExpr.nullable) +} +val paramsFromSubExprs = prepareFunctionParams(ctx, subVars) + +val inputVars = getInputVarsForChildren(ctx, expr) +val paramsFromColumns = prepareFunctionParams(ctx, inputVars) + +val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) +val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => + (row, s"InternalRow $row") +} + +val paramsLength = getParamLength(ctx, inputVars ++ subVars) + paramsFromRows.length +// Maximum allowed parameter number for Java's method descriptor. +if (paramsLength > 255) { + None +} else { + val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip + val callParams = allParams._1.distinct + val declParams = allParams._2.distinct + Some((callParams, declParams)) +} + } + + /** + * Returns the eliminated subexpressions in the children expressions. + */ + def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { +expr.children.flatMap { child => + child.collect { +case e if ctx.subExprEliminationExprs.contains(e) => e + } +}.distinct + } + + /** + * A small helper function to return `ExprCode`s that represent subexpressions. + */ + def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { +subExprs.map { subExpr => + val state = ctx.subExprEliminationExprs(subExpr) + ExprCode(code = "", value = state.value, isNull = state.isNull) +} + } + + /** + * Retrieves previous input rows referred by children and deferred expressions. + */ + def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { +expr.children.flatMap(getInputRows(ctx, _)).distinct + } + + /** + * Given a child expression, retrieves previous input rows referred by it or deferred expressions + * which are needed to evaluate it. + */ + def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { +child.flatMap { + // An expression directly evaluates on current input row. + case
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156286015 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- Is array accessing like `arr[1]` all global variables or not? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156285866 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -979,7 +1016,11 @@ class CodegenContext { val expr = e.head // Generate the code for this expression tree. val eval = expr.genCode(this) - val state = SubExprEliminationState(eval.isNull, eval.value) + val state = if (expr.nullable) { --- End diff -- Yea, can remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r15628 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -979,7 +1016,11 @@ class CodegenContext { val expr = e.head // Generate the code for this expression tree. val eval = expr.genCode(this) - val state = SubExprEliminationState(eval.isNull, eval.value) + val state = if (expr.nullable) { --- End diff -- do we still need it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156285726 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1028,12 +1078,17 @@ class CodegenContext { // 2. Less code. // Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with // at least two nodes) as the cost of doing it is expected to be low. - addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;") - addMutableState(javaType(expr.dataType), value, -s"$value = ${defaultValue(expr.dataType)};") + if (expr.nullable) { +addMutableState(JAVA_BOOLEAN, isNull) + } + addMutableState(javaType(expr.dataType), value) subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);" - val state = SubExprEliminationState(isNull, value) + val state = if (expr.nullable) { +SubExprEliminationState(isNull, value) --- End diff -- do we still need it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156285447 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -55,8 +55,45 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. + * @param inputRow A term that holds the input row name when generating this code. + * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode(var code: String, var isNull: String, var value: String) +case class ExprCode( +var code: String, +var isNull: String, +var value: String, +var inputRow: String = null, +var inputVars: Seq[ExprInputVar] = Seq.empty) { + + // Returns true if this value is a literal. + def isLiteral(): Boolean = { +assert(value.nonEmpty, "ExprCode.value can't be empty string.") + +if (value == "true" || value == "false" || value == "null") { + true +} else { + // The valid characters for the first character of a Java variable is [a-zA-Z_$]. + value.head match { +case v if v >= 'a' && v <= 'z' => false --- End diff -- This won't work for array accessing like `arr[1]`, I think we should pick a more general solution like https://github.com/apache/spark/pull/19938#issuecomment-350935431 , i.e. generate parameter names instead of reusing the input name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19948: [SPARK-19809][SQL][TEST] NullPointerException on zero-si...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19948 So if `SQLConf.ORC_IMPLEMENTATION` is `hive`, we still hit this bug? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156285248 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,12 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val eval = doGenCode(ctx, ExprCode("", isNull, value)) + eval.isNull = if (this.nullable) eval.isNull else "false" --- End diff -- Yea, I plan to do that after this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nest...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19813#discussion_r156284378 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -105,6 +105,12 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val eval = doGenCode(ctx, ExprCode("", isNull, value)) + eval.isNull = if (this.nullable) eval.isNull else "false" --- End diff -- at least we should do it in a new PR. do you wanna do it after this one get merge? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19783 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19783 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281819 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 +} +i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the last bin into which a column value falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = bins.length - 1 +while ((i >= 0) && (value < bins(i).lo)) { + i -= 1 +} +i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param bin a single histogram bin + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + */ + private def getOccupation( + higherValue: Double, + lowerValue: Double, + bin: HistogramBin): Double = { +assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +if (bin.hi == bin.lo) { + // the entire bin is covered in the range + 1.0 +} else if (higherValue == lowerValue) { + // set percentage to 1/NDV + 1.0 / bin.ndv.toDouble +} else { + // Use proration since the range falls inside this bin. + math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) +} + } + + /** + * Returns the number of bins for column values in [lowerValue, higherValue]. + * The column value distribution is saved in an equi-height histogram. The return values is a + * double value is because we may return a portion of a bin. For example, a predicate + * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * + * @param higherId id of the high end bin holding the high end value of a column range + * @param lowerId id of the low end bin holding the low end value of a column range + * @param higherEnd a given upper bound value of a specified column value range + * @param lowerEnd a given lower bound value of a specified column value range + * @param histogram a numeric equi-height histogram + * @return the number of bins for column values in [lowerEnd, higherEnd]. + */ + def getOccupationBins( + higherId: Int, + lowerId: Int, + higherEnd: Double, + lowerEnd: Double, + histogram: Histogram): Double = { +assert(lowerId <= higherId) + +if (lowerId == higherId) { + val curBin = histogram.bins(lowerId) + getOccupation(higherEnd, lowerEnd, curBin) +} else { + // compute how much lowerEnd/higherEnd occupies its bin + val lowerCurBin = histogram.bins(lowerId) + val lowerPart = getOccupation(lowerCurBin.hi, lowerEnd, lowerCurBin) --- End diff -- shall we assert that `lowerBin.lo <= lowerEnd` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281132 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. --- End diff -- Seems this is redundant, shall we remove it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified --- End diff -- nit: `number` -> `index`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281155 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 +} +i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281583 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 +} +i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the last bin into which a column value falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = bins.length - 1 +while ((i >= 0) && (value < bins(i).lo)) { + i -= 1 +} +i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param bin a single histogram bin + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. + */ + private def getOccupation( + higherValue: Double, + lowerValue: Double, + bin: HistogramBin): Double = { +assert(bin.lo <= lowerValue && lowerValue <= higherValue && higherValue <= bin.hi) +if (bin.hi == bin.lo) { + // the entire bin is covered in the range + 1.0 +} else if (higherValue == lowerValue) { + // set percentage to 1/NDV + 1.0 / bin.ndv.toDouble +} else { + // Use proration since the range falls inside this bin. + math.min((higherValue - lowerValue) / (bin.hi - bin.lo), 1.0) +} + } + + /** + * Returns the number of bins for column values in [lowerValue, higherValue]. + * The column value distribution is saved in an equi-height histogram. The return values is a + * double value is because we may return a portion of a bin. For example, a predicate + * "column = 8" may return the number of bins 0.2 if the holding bin has 5 distinct values. + * + * @param higherId id of the high end bin holding the high end value of a column range --- End diff -- nit: `higherIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156282033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -332,8 +332,44 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) -} else { + if (colStat.histogram.isEmpty) { +// returns 1/ndv if there is no histogram +Some(1.0 / BigDecimal(ndv)) + } else { +// We compute filter selectivity using Histogram information. +val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble +val histogram = colStat.histogram.get +val hgmBins = histogram.bins + +// find bins where column's current min and max locate. Note that a column's [min, max] +// range may change due to another condition applied earlier. +val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble +val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble +val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins) --- End diff -- nit: `minBinIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281162 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 +} +i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the last bin into which a column value falls. --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281918 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -332,8 +332,41 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) -} else { + if (colStat.histogram.isEmpty) { +// returns 1/ndv if there is no histogram +Some(1.0 / BigDecimal(ndv)) + } else { +// We compute filter selectivity using Histogram information. --- End diff -- did you create a new method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156282461 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala --- @@ -332,8 +332,44 @@ case class FilterEstimation(plan: Filter) extends Logging { colStatsMap.update(attr, newStats) } - Some(1.0 / BigDecimal(ndv)) -} else { + if (colStat.histogram.isEmpty) { +// returns 1/ndv if there is no histogram +Some(1.0 / BigDecimal(ndv)) + } else { +// We compute filter selectivity using Histogram information. +val datum = EstimationUtils.toDecimal(literal.value, literal.dataType).toDouble +val histogram = colStat.histogram.get +val hgmBins = histogram.bins + +// find bins where column's current min and max locate. Note that a column's [min, max] +// range may change due to another condition applied earlier. +val min = EstimationUtils.toDecimal(colStat.min.get, literal.dataType).toDouble +val max = EstimationUtils.toDecimal(colStat.max.get, literal.dataType).toDouble +val minBinId = EstimationUtils.findFirstBinForValue(min, hgmBins) +val maxBinId = EstimationUtils.findLastBinForValue(max, hgmBins) + +// compute how many bins the column's current valid range [min, max] occupies. +// Note that a column's [min, max] range may vary after we apply some filter conditions. +val validRangeBins = EstimationUtils.getOccupationBins(maxBinId, minBinId, max, + min, histogram) + +val lowerBinId = EstimationUtils.findFirstBinForValue(datum, hgmBins) +val higherBinId = EstimationUtils.findLastBinForValue(datum, hgmBins) +assert(lowerBinId <= higherBinId) +val lowerBinNdv = hgmBins(lowerBinId).ndv +val higherBinNdv = hgmBins(higherBinId).ndv +// assume uniform distribution in each bin +val occupiedBins = if (lowerBinId == higherBinId) { --- End diff -- is this just `EstimationUtils.getOccupationBins(higherBinId, lowerBinId, datum, datum, histogram)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19783: [SPARK-21322][SQL] support histogram in filter ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19783#discussion_r156281223 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala --- @@ -114,4 +114,99 @@ object EstimationUtils { } } + /** + * Returns the number of the first bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the first bin into which a column value falls. + */ + def findFirstBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = 0 +while ((i < bins.length) && (value > bins(i).hi)) { + i += 1 +} +i + } + + /** + * Returns the number of the last bin into which a column value falls for a specified + * numeric equi-height histogram. + * + * @param value a literal value of a column + * @param bins an array of bins for a given numeric equi-height histogram + * @return the id of the last bin into which a column value falls. + */ + def findLastBinForValue(value: Double, bins: Array[HistogramBin]): Int = { +var i = bins.length - 1 +while ((i >= 0) && (value < bins(i).lo)) { + i -= 1 +} +i + } + + /** + * Returns a percentage of a bin holding values for column value in the range of + * [lowerValue, higherValue] + * + * @param higherValue a given upper bound value of a specified column value range + * @param lowerValue a given lower bound value of a specified column value range + * @param bin a single histogram bin + * @return the percentage of a single bin holding values in [lowerValue, higherValue]. --- End diff -- redundant --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19783 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84751/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19783 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19783 **[Test build #84751 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84751/testReport)** for PR 19783 at commit [`be1e7ba`](https://github.com/apache/spark/commit/be1e7ba1ae485da523232e2ddffce8ac911392dd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19792#discussion_r156279744 --- Diff: python/pyspark/sql/types.py --- @@ -1083,7 +1083,8 @@ def _infer_schema(row): elif hasattr(row, "_fields"): # namedtuple items = zip(row._fields, tuple(row)) else: -names = ['_%d' % i for i in range(1, len(row) + 1)] +if names is None: +names = ['_%d' % i for i in range(1, len(row) + 1)] --- End diff -- @gberger, Let's revert this change too. Seems it's going to introduce a behaviour change: **Before** ``` >>> spark.createDataFrame([["a", "b"]], ["col1"]).show() 17/12/12 15:29:45 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException ++---+ |col1| _2| ++---+ | a| b| ++---+ ``` **After** ``` >>> spark.createDataFrame([["a", "b"]], ["col1"]).show() ... java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 2 values are provided. at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:148) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19715: [SPARK-22397][ML]add multiple columns support to Quantil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19715 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84753/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19715: [SPARK-22397][ML]add multiple columns support to Quantil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19715 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19715: [SPARK-22397][ML]add multiple columns support to Quantil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19715 **[Test build #84753 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84753/testReport)** for PR 19715 at commit [`0e5971b`](https://github.com/apache/spark/commit/0e5971b3a95ae6f105659f6361c0db5a1c9fb9d8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19621: [SPARK-11215][ML] Add multiple columns support to String...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19621 @felixcheung "iris" is a built-in dataset in R, used in many algo testing, so is it proper to change it ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19746: [SPARK-22346][ML] VectorSizeHint Transformer for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19746#discussion_r156279322 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.SparkException +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} +import org.apache.spark.sql.{Column, DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.StructType + +/** + * A feature transformer that adds vector size information to a vector column. + */ +@Experimental +@Since("2.3.0") +class VectorSizeHint @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Transformer with HasInputCol with HasHandleInvalid with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("vectSizeHint")) + + @Since("2.3.0") + val size = new IntParam(this, "size", "Size of vectors in column.", {s: Int => s >= 0}) + + @Since("2.3.0") + def getSize: Int = getOrDefault(size) + + /** @group setParam */ + @Since("2.3.0") + def setSize(value: Int): this.type = set(size, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + @Since("2.3.0") + override val handleInvalid: Param[String] = new Param[String]( +this, +"handleInvalid", +"How to handle invalid vectors in inputCol, (invalid vectors include nulls and vectors with " + + "the wrong size. The options are `skip` (filter out rows with invalid vectors), `error` " + + "(throw an error) and `optimistic` (don't check the vector size).", +ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids)) + + /** @group setParam */ + @Since("2.3.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + setDefault(handleInvalid, VectorSizeHint.ERROR_INVALID) + + @Since("2.3.0") + override def transform(dataset: Dataset[_]): DataFrame = { +val localInputCol = getInputCol +val localSize = getSize +val localHandleInvalid = getHandleInvalid + +val group = AttributeGroup.fromStructField(dataset.schema(localInputCol)) +if (localHandleInvalid == VectorSizeHint.OPTIMISTIC_INVALID && group.size == localSize) { + dataset.toDF +} else { + val newGroup = group.size match { +case `localSize` => group +case -1 => new AttributeGroup(localInputCol, localSize) +case _ => + val msg = s"Trying to set size of vectors in `$localInputCol` to $localSize but size " + +s"already set to ${group.size}." + throw new SparkException(msg) + } + + val newCol: Column = localHandleInvalid match { +case VectorSizeHint.OPTIMISTIC_INVALID => col(localInputCol) +case VectorSizeHint.ERROR_INVALID => + val checkVectorSizeUDF = udf { vector: Vector => +if (vector == null) { + throw new SparkException(s"Got null vector in VectorSizeHint, set `handleInvalid` " + +s"to 'skip' to filter invalid rows.") +} +if (vector.size != localSize) { + throw new SparkException(s"VectorSizeHint Expecting a vector of size $localSize but" + +s" got ${vector.size}") +} +
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18624 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84752/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18624 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18624 **[Test build #84752 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84752/testReport)** for PR 18624 at commit [`f36706a`](https://github.com/apache/spark/commit/f36706ae46f4b0562b1d85627a526fcf642e913c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on the issue: https://github.com/apache/spark/pull/16578 @abhaynahar I think the reviewers are already included... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19944: [SPARK-22756] [Build] [SparkR] Run SparkR tests if hive_...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19944 cc @felixcheung Very weird... I can reproduce it in my local environment. Could you take a look at why SparkR failed? ``` Caused by: org.apache.spark.SparkException: R computation failed with Error : requireNamespace("e1071", quietly = TRUE) is not TRUE at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user abhaynahar commented on the issue: https://github.com/apache/spark/pull/16578 @viirya can you please help tag people you think should review ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19715: [SPARK-22397][ML]add multiple columns support to Quantil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19715 **[Test build #84753 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84753/testReport)** for PR 19715 at commit [`0e5971b`](https://github.com/apache/spark/commit/0e5971b3a95ae6f105659f6361c0db5a1c9fb9d8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156272847 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156272829 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + + s"spark.network.timeout' must be no less than the value of" + --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156272812 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + + s"spark.network.timeout' must be no less than the value of" + + s" 'spark.executor.heartbeatInterval'.") --- End diff -- Done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19811 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84748/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19811 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19811 **[Test build #84748 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84748/testReport)** for PR 19811 at commit [`b9adca3`](https://github.com/apache/spark/commit/b9adca38ee17cc4568f3fcef4ef2b0353dc6f260). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18624 **[Test build #84752 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84752/testReport)** for PR 18624 at commit [`f36706a`](https://github.com/apache/spark/commit/f36706ae46f4b0562b1d85627a526fcf642e913c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user mpjlu commented on the issue: https://github.com/apache/spark/pull/18624 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156270767 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + + s"spark.network.timeout' must be no less than the value of" + + s" 'spark.executor.heartbeatInterval'.") --- End diff -- Sorry for that,i will check more carefully! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19770: [SPARK-21571][WEB UI] Spark history server leaves incomp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19770 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19770: [SPARK-21571][WEB UI] Spark history server leaves incomp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19770 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84746/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19770: [SPARK-21571][WEB UI] Spark history server leaves incomp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19770 **[Test build #84746 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84746/testReport)** for PR 19770 at commit [`9194aab`](https://github.com/apache/spark/commit/9194aab0cbe80690bf104a9ffbf292b2ecb93f48). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18624 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18624 **[Test build #84750 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84750/testReport)** for PR 18624 at commit [`f36706a`](https://github.com/apache/spark/commit/f36706ae46f4b0562b1d85627a526fcf642e913c). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18624 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84750/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156268681 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + --- End diff -- Should also add a simple and explicit comment to explain the reason why this check is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156268623 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + + s"spark.network.timeout' must be no less than the value of" + + s" 'spark.executor.heartbeatInterval'.") --- End diff -- nit: The whitespace should always go to the end of the previous line instead of the beginning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156268550 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of" + + s"spark.network.timeout' must be no less than the value of" + --- End diff -- nit: remove the 's' in this line and the following line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156268382 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,18 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +if (executorHeartbeatInterval > executorTimeoutThreshold) { --- End diff -- Done @jiangxb1987 Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19746: [SPARK-22346][ML] VectorSizeHint Transformer for ...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19746#discussion_r156268308 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.SparkException +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} +import org.apache.spark.sql.{Column, DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.StructType + +/** + * A feature transformer that adds vector size information to a vector column. + */ +@Experimental +@Since("2.3.0") +class VectorSizeHint @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Transformer with HasInputCol with HasHandleInvalid with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("vectSizeHint")) + + @Since("2.3.0") + val size = new IntParam(this, "size", "Size of vectors in column.", {s: Int => s >= 0}) + + @Since("2.3.0") + def getSize: Int = getOrDefault(size) + + /** @group setParam */ + @Since("2.3.0") + def setSize(value: Int): this.type = set(size, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + @Since("2.3.0") + override val handleInvalid: Param[String] = new Param[String]( +this, +"handleInvalid", +"How to handle invalid vectors in inputCol, (invalid vectors include nulls and vectors with " + + "the wrong size. The options are `skip` (filter out rows with invalid vectors), `error` " + + "(throw an error) and `optimistic` (don't check the vector size).", +ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids)) + + /** @group setParam */ + @Since("2.3.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + setDefault(handleInvalid, VectorSizeHint.ERROR_INVALID) + + @Since("2.3.0") + override def transform(dataset: Dataset[_]): DataFrame = { +val localInputCol = getInputCol +val localSize = getSize +val localHandleInvalid = getHandleInvalid + +val group = AttributeGroup.fromStructField(dataset.schema(localInputCol)) +if (localHandleInvalid == VectorSizeHint.OPTIMISTIC_INVALID && group.size == localSize) { + dataset.toDF +} else { + val newGroup = group.size match { +case `localSize` => group +case -1 => new AttributeGroup(localInputCol, localSize) +case _ => + val msg = s"Trying to set size of vectors in `$localInputCol` to $localSize but size " + +s"already set to ${group.size}." + throw new SparkException(msg) + } + + val newCol: Column = localHandleInvalid match { +case VectorSizeHint.OPTIMISTIC_INVALID => col(localInputCol) +case VectorSizeHint.ERROR_INVALID => + val checkVectorSizeUDF = udf { vector: Vector => +if (vector == null) { + throw new SparkException(s"Got null vector in VectorSizeHint, set `handleInvalid` " + +s"to 'skip' to filter invalid rows.") +} +if (vector.size != localSize) { + throw new SparkException(s"VectorSizeHint Expecting a vector of size $localSize but" + +s" got ${vector.size}") +}
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156267891 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -564,6 +564,18 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") + +val executorTimeoutThreshold = Utils.timeStringAsSeconds(get("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + get("spark.executor.heartbeatInterval", "10s")) +if (executorHeartbeatInterval > executorTimeoutThreshold) { --- End diff -- ```require(executorHeartbeatInterval > executorTimeoutThreshold, s"The value of 'spark.network.timeout' must be no less than the value of 'spark.executor.heartbeatInterval'.")``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19948: [SPARK-19809][SQL][TEST] NullPointerException on zero-si...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19948 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19948: [SPARK-19809][SQL][TEST] NullPointerException on zero-si...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19948 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84747/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19948: [SPARK-19809][SQL][TEST] NullPointerException on zero-si...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19948 **[Test build #84747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84747/testReport)** for PR 19948 at commit [`492a30f`](https://github.com/apache/spark/commit/492a30ff673372b04455cd3bb454701961c65760). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19938: [SPARK-22747][SQL] Localize lifetime of mutable states i...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19938 Sure, in #19865, I will generate new parameter name in `splitExpression` instead of inserting assertion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156266325 --- Diff: external/kinesis-asl/src/main/java/org/apache/spark/streaming/kinesis/KinesisInitialPosition.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis; + +import java.util.Date; + +/** + * A java wrapper for org.apache.spark.streaming.kinesis.InitialPosition + * to expose the corresponding scala objects for InitialPositionInStream. + * The functions are intentionally Upper cased to appear like classes for + * usage in Java classes. + */ +public class KinesisInitialPosition { --- End diff -- please add `@InterfaceStability.Evolving` above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156265997 --- Diff: external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala --- @@ -101,12 +102,60 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE .build() assert(dstream.endpointUrl == customEndpointUrl) assert(dstream.regionName == customRegion) -assert(dstream.initialPositionInStream == customInitialPosition) +assert(dstream.initialPosition == customInitialPosition) assert(dstream.checkpointAppName == customAppName) assert(dstream.checkpointInterval == customCheckpointInterval) assert(dstream._storageLevel == customStorageLevel) assert(dstream.kinesisCreds == customKinesisCreds) assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds)) assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with AtTimestamp +val cal = Calendar.getInstance() +cal.add(Calendar.DATE, -1) +val timestamp = cal.getTime() +val initialPositionAtTimestamp = AtTimestamp(timestamp) + +val dstreamAtTimestamp = builder + .endpointUrl(customEndpointUrl) + .regionName(customRegion) + .initialPosition(initialPositionAtTimestamp) + .checkpointAppName(customAppName) + .checkpointInterval(customCheckpointInterval) + .storageLevel(customStorageLevel) + .kinesisCredentials(customKinesisCreds) + .dynamoDBCredentials(customDynamoDBCreds) + .cloudWatchCredentials(customCloudWatchCreds) + .build() +assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl) +assert(dstreamAtTimestamp.regionName == customRegion) +assert(dstreamAtTimestamp.initialPosition.initialPositionInStream + == initialPositionAtTimestamp.initialPositionInStream) +assert( + dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].timestamp.equals(timestamp)) +assert(dstreamAtTimestamp.checkpointAppName == customAppName) +assert(dstreamAtTimestamp.checkpointInterval == customCheckpointInterval) +assert(dstreamAtTimestamp._storageLevel == customStorageLevel) +assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds) +assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds)) +assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds)) + +// Testing with AtTimestamp +val initialPositionAtTimestamp2 = AtTimestamp(timestamp) --- End diff -- how is the following lines a different test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18029: [SPARK-20168] [DStream] Add changes to use kinesi...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/18029#discussion_r156266783 --- Diff: external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/InitialPosition.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.kinesis + +import java.util.Date + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +/** + * Trait for Kinesis's InitialPositionInStream. + * This will be overridden by more specific types. + */ +sealed trait InitialPosition { + val initialPositionInStream: InitialPositionInStream +} + +/** + * Case object for Kinesis's InitialPositionInStream.LATEST. + */ +case object Latest extends InitialPosition { --- End diff -- can you wrap all of these in an object? ```scala sealed trait InitialPosition { ... } object internal { case object Latest extends InitialPosition { } ... case class AtTimestamp(timestamp: Date) extends InitialPosition { } } ``` Note how InitialPosition is outside, and `internal` is lowercase. so that people go only through the Java Interface (`org.apache.spark.streaming.kinesis.Latest()`) etc Your documentation and test cases go through the Scala interface which makes it super weird to have 2 things corresponding to the same thing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19861 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19861 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84745/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19861 **[Test build #84745 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84745/testReport)** for PR 19861 at commit [`ec9a717`](https://github.com/apache/spark/commit/ec9a717d218e966f38068f1a407b749debea4f35). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19947: [SPARK-22759] [SQL] Filters can be combined iff both are...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19947 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84744/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19947: [SPARK-22759] [SQL] Filters can be combined iff both are...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19947 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19947: [SPARK-22759] [SQL] Filters can be combined iff both are...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19947 **[Test build #84744 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84744/testReport)** for PR 19947 at commit [`e726f54`](https://github.com/apache/spark/commit/e726f548478c769c061a52ead12948239792a404). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19783 **[Test build #84751 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84751/testReport)** for PR 19783 at commit [`be1e7ba`](https://github.com/apache/spark/commit/be1e7ba1ae485da523232e2ddffce8ac911392dd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17702: [SPARK-20408][SQL] Get the glob path in parallel ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/17702#discussion_r156265041 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -668,4 +672,31 @@ object DataSource extends Logging { } globPath } + + /** + * Return all paths represented by the wildcard string. + * Follow [[InMemoryFileIndex]].bulkListLeafFile and reuse the conf. + */ + private def getGlobbedPaths( + sparkSession: SparkSession, + fs: FileSystem, + hadoopConf: SerializableConfiguration, + qualified: Path): Seq[Path] = { +val paths = SparkHadoopUtil.get.expandGlobPath(fs, qualified) +if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) +} else { + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism + val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) + val expanded = sparkSession.sparkContext --- End diff -- Yep, I means YARN and HDFS always deploy in same region, but driver we can't control because it's our customer's machine in client mode like spark sql or shell. For example we deploy YARN and HDFS in Beijing CN, user use spark sql on Shanghai CN. Maybe this scenario shouldn't consider in this patch? What's your opinion @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19783: [SPARK-21322][SQL] support histogram in filter cardinali...
Github user ron8hu commented on the issue: https://github.com/apache/spark/pull/19783 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19942: [SPARK-22754][DEPLOY] Check whether spark.executo...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19942#discussion_r156264833 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -291,6 +291,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S if (proxyUser != null && principal != null) { SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.") } + +val executorTimeoutThreshold = Utils.timeStringAsSeconds( + sparkProperties.getOrElse("spark.network.timeout", "120s")) +val executorHeartbeatInterval = Utils.timeStringAsSeconds( + sparkProperties.getOrElse("spark.executor.heartbeatInterval", "10s")) --- End diff -- Done @vanzin Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19938: [SPARK-22747][SQL] Localize lifetime of mutable states i...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19938 I prefer to generate new parameter name in `splitExpression` over localizing global variables. There is no contract that an `Expression` must output java variables, we may inline some values, e.g. we already output `false` or `true` literal for `isNull`, and I roughly remember we do the same thing for `value`, e.g. `a + 1` instead of a new java variable `c` which is calculated by `c = a + 1`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19813 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19813 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84742/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18624: [SPARK-21389][ML][MLLIB] Optimize ALS recommendForAll by...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18624 **[Test build #84750 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84750/testReport)** for PR 18624 at commit [`f36706a`](https://github.com/apache/spark/commit/f36706ae46f4b0562b1d85627a526fcf642e913c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19813 **[Test build #84742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84742/testReport)** for PR 19813 at commit [`c083a79`](https://github.com/apache/spark/commit/c083a7955cd6fb54e0448176d9684496fae48e6f). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org