[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22219#discussion_r212664399 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -348,30 +350,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) -if (buf.isEmpty) { +if (scannedRowCount == 0) { numPartsToTry = partsScanned * limitScaleUpFactor } else { - val left = n - buf.size + val left = n - scannedRowCount // As left > 0, numPartsToTry is always >= 1 - numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt + numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt --- End diff -- nit: Is it better to define a variable as `val`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22219#discussion_r212664064 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3237,6 +3237,20 @@ class Dataset[T] private[sql]( files.toSet.toArray } + def collectCountAndIterator(): (Long, Iterator[T]) = +withAction("collectCountAndIterator", queryExecution) { plan => + // This projection writes output to a `InternalRow`, which means applying this projection is + // not thread-safe. Here we create the projection inside this method to make `Dataset` + // thread-safe. +val objProj = GenerateSafeProjection.generate(deserializer :: Nil) + val (totalRowCount, iterInternalRows) = plan.executeCollectIterator() + (totalRowCount, iterInternalRows.map { row => +// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type +// parameter of its `get` method, so it's safe to use null here. + objProj(row).get(0, null).asInstanceOf[T] --- End diff -- nit: two more spaces for indentation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22219: [SPARK-25224][SQL] Improvement of Spark SQL ThriftServer...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22219 Did you verify this feature manually? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r212649063 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -223,8 +223,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa } } else { val lit = InternalRow(expected, expected) + val dtAsNullable = expression.dataType.asNullable --- End diff -- Without #22126, `MapZipWith` made incorrect `DataType`. However, no test case can detect the failure. This PR can detect that incorrect `DataType` in `MapZipWith` without #22126. I would appreciate it if you would run the test case without #22126, without and with this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r212621229 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -223,8 +223,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa } } else { val lit = InternalRow(expected, expected) + val dtAsNullable = expression.dataType.asNullable --- End diff -- Without this change (`asNullable`), `NullPointerException` occurs if `expected` has `null` for a primitive type (e.g. `int`). If you run `"MapZipWith"` test suite without this PR, you can see `NullPointerException`. If you run `"MapZipWith"` with [the PR](https://github.com/apache/spark/pull/22126), you can see the failure (i.e. hidden potential bug). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r212584773 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -223,8 +223,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa } } else { val lit = InternalRow(expected, expected) + val dtAsNullable = expression.dataType.asNullable --- End diff -- Good question. This change related to the part in the description ``` On the other hand, expected will be converted to UnsafeArrayData(null) considering its value. ``` Since this PR removes nullchecks in the generated code based on DataType of `expression`, `null` in `expected` never converted to `null`. To forcefully insert null check in the generated code to convert `experted` to `UnsafeArrayData`, we need `asNullable`. Does it answer to this question? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212569048 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -164,9 +164,8 @@ private[joins] class UnsafeHashedRelation( def getValue(key: InternalRow): InternalRow = { val unsafeKey = key.asInstanceOf[UnsafeRow] val map = binaryMap // avoid the compiler error -val loc = new map.Location // this could be allocated in stack -binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, - unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode()) +val loc = map.lookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, --- End diff -- IIUC, this change makes this part thread-unsafe. Is it OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21859 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22048: [SPARK-25108][SQL] Fix the show method to display...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22048#discussion_r212557191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -294,23 +294,24 @@ class Dataset[T] private[sql]( // We set a minimum column width at '3' val minimumColWidth = 3 +val regex = """[^\x00-\u2e39]""".r --- End diff -- I agree with @srowen's comment. To make it clear, could you please add tests for more characters? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r212549322 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -853,33 +853,47 @@ case class HashAggregateExec( val updateRowInHashMap: String = { if (isFastHashMapEnabled) { -ctx.INPUT_ROW = fastRowBuffer -val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) -val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) -val effectiveCodes = subExprs.codes.mkString("\n") -val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) -} -val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => - val dt = updateExpr(i).dataType - CodeGenerator.updateColumn( -fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) -} +if (isVectorizedHashMapEnabled) { + ctx.INPUT_ROW = fastRowBuffer + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { +boundUpdateExpr.map(_.genCode(ctx)) + } + val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => +val dt = updateExpr(i).dataType +CodeGenerator.updateColumn( + fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) + } -// If fast hash map is on, we first generate code to update row in fast hash map, if the -// previous loop up hit fast hash map. Otherwise, update row in regular hash map. -s""" - |if ($fastRowBuffer != null) { - | // common sub-expressions - | $effectiveCodes - | // evaluate aggregate function - | ${evaluateVariables(fastRowEvals)} - | // update fast row - | ${updateFastRow.mkString("\n").trim} - |} else { - | $updateRowInRegularHashMap - |} - """.stripMargin + // If vectorized fast hash map is on, we first generate code to update row + // in vectorized fast hash map, if the previous loop up hit vectorized fast hash map. + // Otherwise, update row in regular hash map. + s""" + |if ($fastRowBuffer != null) { + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(fastRowEvals)} + | // update fast row + | ${updateFastRow.mkString("\n").trim} + |} else { + | $updateRowInRegularHashMap + |} + """.stripMargin +} else { + // If fast hash map is on, we first reuse regular hash buffer to update row + // in fast hash map, if the previous loop up hit fast hash map. --- End diff -- nit: it may not be easy to parse this statement `if the previous loop up hit fast hash map`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21537 Sure, let me create a JIRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22203: [SPARK-25029][BUILD][CORE] Janino "Two non-abstra...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22203#discussion_r212535154 --- Diff: dev/deps/spark-deps-hadoop-2.6 --- @@ -98,7 +98,7 @@ jackson-module-jaxb-annotations-2.6.7.jar jackson-module-paranamer-2.7.9.jar jackson-module-scala_2.11-2.6.7.1.jar jackson-xc-1.9.13.jar -janino-3.0.8.jar +janino-3.0.9.jar --- End diff -- Sorry for being late. I confirmed all of changes in the changelog. Two issues https://github.com/janino-compiler/janino/pull/54 and https://github.com/janino-compiler/janino/pull/46 are for Java 8 and later support. Others are bug fixes. LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22187: [SPARK-25178][SQL] Directly ship the StructType objects ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22187 Thanks, updated both titles --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22187: [SPARK-25178][SQL] change the generated code of t...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22187#discussion_r212075572 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -44,31 +44,19 @@ class RowBasedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { -val generatedKeySchema: String = - s"new org.apache.spark.sql.types.StructType()" + -groupingKeySchema.map { key => - val keyName = ctx.addReferenceObj("keyName", key.name) - key.dataType match { -case d: DecimalType => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.createDecimalType( - |${d.precision}, ${d.scale}))""".stripMargin -case _ => - s""".add($keyName, org.apache.spark.sql.types.DataTypes.${key.dataType})""" - } -}.mkString("\n").concat(";") +val generatedKeyColTypes = groupingKeySchema + .zipWithIndex.map { case (t, i) => (s"_col$i", t.dataType) } +val generatedKeySchemaTypes = generatedKeyColTypes + .foldLeft(new StructType())((schema, colType) => schema.add(colType._1, colType._2)) --- End diff -- Ah, we do not need recreate the structure type. I think we do not need `.asNullable`. Furthermore, I think that we do not need `generatedAggBufferColTypes`. We need only # of its fields. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22187: [SPARK-25178][SQL] change the generated code of t...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/22187 [SPARK-25178][SQL] change the generated code of the keySchema / valueSchema for xxxHashMapGenerator ## What changes were proposed in this pull request? This PR generates the code that to refer a `StructType` generated in the scala code instead of generating `StructType` in Java code. This solves two issues. 1. Avoid to used the field name such as `key.name` 1. Support complicated schema (e.g. nested DataType) Originally, [the JIRA entry](https://issues.apache.org/jira/browse/SPARK-25178) proposed to change the generated field name of the keySchema / valueSchema to a dummy name in `RowBasedHashMapGenerator` and `VectorizedHashMapGenerator.scala`. @Ueshin suggested to refer to a `StructType` generated in the scala code using `ctx.addReferenceObj()`. ## How was this patch tested? Existing UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-25178 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22187.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22187 commit 0626de74f726622ac3eb251fc9f66aaa3de002d3 Author: Kazuaki Ishizaki Date: 2018-08-22T18:10:24Z initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createUnsafeAr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21912 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21859 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r211693601 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -43,25 +45,30 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => false } - // TODO: if the nullability of field is correct, we can use it to save null check. private def writeStructToBuffer( ctx: CodegenContext, input: String, index: String, - fieldTypes: Seq[DataType], + fieldTypeAndNullables: Seq[Schema], rowWriter: String): String = { // Puts `input` in a local variable to avoid to re-evaluate it if it's a statement. val tmpInput = ctx.freshName("tmpInput") -val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) => - ExprCode( -JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)"), -JavaCode.expression(CodeGenerator.getValue(tmpInput, dt, i.toString), dt)) +val fieldEvals = fieldTypeAndNullables.zipWithIndex.map { case (dtNullable, i) => + val isNull = if (dtNullable.nullable) { +JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)") + } else { +FalseLiteral + } + ExprCode(isNull, JavaCode.expression( +CodeGenerator.getValue(tmpInput, dtNullable.dataType, i.toString), dtNullable.dataType)) } val rowWriterClass = classOf[UnsafeRowWriter].getName val structRowWriter = ctx.addMutableState(rowWriterClass, "rowWriter", v => s"$v = new $rowWriterClass($rowWriter, ${fieldEvals.length});") val previousCursor = ctx.freshName("previousCursor") +val structExpressions = writeExpressionsToBuffer( + ctx, tmpInput, fieldEvals, fieldTypeAndNullables.map(_.dataType), structRowWriter) --- End diff -- Ah, i see. `expression.genCode()` called in `ctx.generateExpressions()` looks conservation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r211370785 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelperSuite.scala --- @@ -35,6 +35,24 @@ class ExpressionEvalHelperSuite extends SparkFunSuite with ExpressionEvalHelper val e = intercept[RuntimeException] { checkEvaluation(BadCodegenExpression(), 10) } assert(e.getMessage.contains("some_variable")) } + + test("SPARK-23466: checkEvaluationWithUnsafeProjection should fail if null is compared with " + +"primitive default value") { +val expected = Array(null, -1, 0, 1) +val catalystValue = CatalystTypeConverters.convertToCatalyst(expected) + +val expression1 = CreateArray( + Seq(Literal(null, IntegerType), Literal(-1), Literal(0), Literal(1))) +assert(expression1.dataType.containsNull) +checkEvaluationWithUnsafeProjection(expression1, catalystValue) + +val expression2 = CreateArray(Seq(Literal(0, IntegerType), Literal(-1), Literal(0), Literal(1))) +assert(!expression2.dataType.containsNull) +val e = intercept[RuntimeException] { + checkEvaluationWithUnsafeProjection(expression2, catalystValue) --- End diff -- Unfortunately, `checkEvaluationWithUnsafeProjection()`'s return type is `Unit`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r211365612 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -43,25 +45,30 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => false } - // TODO: if the nullability of field is correct, we can use it to save null check. private def writeStructToBuffer( ctx: CodegenContext, input: String, index: String, - fieldTypes: Seq[DataType], + fieldTypeAndNullables: Seq[Schema], rowWriter: String): String = { // Puts `input` in a local variable to avoid to re-evaluate it if it's a statement. val tmpInput = ctx.freshName("tmpInput") -val fieldEvals = fieldTypes.zipWithIndex.map { case (dt, i) => - ExprCode( -JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)"), -JavaCode.expression(CodeGenerator.getValue(tmpInput, dt, i.toString), dt)) +val fieldEvals = fieldTypeAndNullables.zipWithIndex.map { case (dtNullable, i) => + val isNull = if (dtNullable.nullable) { +JavaCode.isNullExpression(s"$tmpInput.isNullAt($i)") + } else { +FalseLiteral + } + ExprCode(isNull, JavaCode.expression( +CodeGenerator.getValue(tmpInput, dtNullable.dataType, i.toString), dtNullable.dataType)) } val rowWriterClass = classOf[UnsafeRowWriter].getName val structRowWriter = ctx.addMutableState(rowWriterClass, "rowWriter", v => s"$v = new $rowWriterClass($rowWriter, ${fieldEvals.length});") val previousCursor = ctx.freshName("previousCursor") +val structExpressions = writeExpressionsToBuffer( + ctx, tmpInput, fieldEvals, fieldTypeAndNullables.map(_.dataType), structRowWriter) --- End diff -- I think `nullable` is passed thru `fieldEvals`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createU...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21912#discussion_r211126450 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -735,70 +735,98 @@ class CodegenContext { } /** - * Generates code creating a [[UnsafeArrayData]]. + * Generates code creating a [[UnsafeArrayData]] or [[GenericArrayData]] based on + * given parameters. * * @param arrayName name of the array to create + * @param elementType data type of the elements in source array * @param numElements code representing the number of elements the array should contain - * @param elementType data type of the elements in the array * @param additionalErrorMessage string to include in the error message + * @param elementSize optional value which shows the size of an element of the allocated + *[[UnsafeArrayData]] or [[GenericArrayData]] + * + * @return code representing the allocation of [[ArrayData]] + * code representing a setter of an assignment for the generated array */ - def createUnsafeArray( + def createArrayData( --- End diff -- I think it would be good to have this since this utility method will be used at other places where `new UnsafeArrayData` or `new GenericArrayData` is generated as Java code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createUnsafeAr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21912 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createU...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21912#discussion_r211096868 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala --- @@ -34,6 +36,32 @@ object ArrayData { case a: Array[Double] => UnsafeArrayData.fromPrimitiveArray(a) case other => new GenericArrayData(other) } + + + /** + * Allocate [[UnsafeArrayData]] or [[GenericArrayData]] based on given parameters. + * + * @param elementSize a size of an element in bytes + * @param numElements the number of elements the array should contain + * @param isPrimitiveType whether the type of an element is primitive type + * @param additionalErrorMessage string to include in the error message + */ + def allocateArrayData( + elementSize: Int, + numElements : Long, + isPrimitiveType: Boolean, + additionalErrorMessage: String) : ArrayData = { +if (isPrimitiveType && !UnsafeArrayData.shouldUseGenericArrayData(elementSize, numElements)) { + UnsafeArrayData.forPrimitiveArray(Platform.BYTE_ARRAY_OFFSET, numElements.toInt, elementSize) --- End diff -- Good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21859 LGTM cc @viirya @cloud-fan @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21931 LGTM, cc @cloud-fan @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21859 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21087: [SPARK-23997][SQL] Configurable maximum number of...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21087#discussion_r211080067 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -164,9 +165,12 @@ case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { - if (numBuckets <= 0 || numBuckets >= 10) { + def conf: SQLConf = SQLConf.get + + if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) { --- End diff -- Since the condition is changed from `>` to `>=`, there is inconsistent between the condition and the error message. If this condition is true, the message is like `... but less than or equal to bucketing.maxBuckets ...`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21860 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20637: [SPARK-23466][SQL] Remove redundant null checks in gener...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20637 cc @ueshin @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20637: [SPARK-23466][SQL] Remove redundant null checks in gener...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20637 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22135: [SPARK-25093][SQL] Avoid recompiling regexp for comments...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22135 SGTM, but can we address the similar issues at once? Even under `src/main/...`, we can see this pattern at several places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22125: [DOCS] Fix cloud-integration.md Typo
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22125 @KraFusion Sorry, I overlooked another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22048: [SPARK-25108][SQL] Fix the show method to display the wi...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22048 Thank you for creating a JIRA entry and for putting the result. The test case is not available yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22126: [SPARK-23938][SQL][FOLLOW-UP][TEST] Nullabilities of val...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22126 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22125: [DOCS] Fix cloud-integration.md Typo
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22125 Thanks, would it possible to address similar issues? For example, in `configurations.md`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21537 @gatorsmile Thank you for your reply. Could you elaborate on your suggestion? >A general suggestion. To avoid introducing the regressions, how about implementing a new one without changing the existing one? We can easily switch to the new one when it becomes stable. Does it mean to work in a particular branch or to work in a fork repository until its implementation has been completed ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21537 For 2. and 3., it is harder to say my opinion in the comment. Let me say short comments at first. For 2., if I remember correctly, @viirya once wrote the API document in a JIRA entry. it would be good to update and add some thoughts about design as a first step. I understand that it is hard to keep the up-to-date design document, in particular, during the open-source development. This is because we have a lot of excellent comments in a PR. For 3., at the early implementation of SQL codegen (i.e. use `s""` to represent code), I thought there are two problems. 1. lack of type of an expression (in other words, `ExprCode` did not have the type of `value`) 2. lack of a structure of statements Now, we meet a problem that it is hard to rewrite a method argument due to problem 1. In my understanding, the current effort led by @viirya is trying to resolve problem 1. It is hard to rewrite a set of statements due to Problem 2. To resolve problem 2, we need more effort. In my opinion, we are addressing them step by step. Of course, it would be happy for me to co-lead a discussion of the IR design for 2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21537: [SPARK-24505][SQL] Convert strings in codegen to blocks:...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21537 Thank for involving me in an important thread. I was busy this morning in Japan. I think there are three topics in the thread. 1. Merge or revert this PR 2. Design document 3. IR design For 1., in short, my opinion is likely to revert this PR from the view like a release manager. As we know, it is a time to cut a release branch. This PR partially introduce a new representation. If there would be a bug in code generation at Spark 2.4, it may introduce a complication of debugging and fixing. As @mgaido91 pointed out, #20043 and #21026 have been merged. I think that they are a kind of refactoring (e.g. change the representation of literal, class, and so on). Thus, these two PRs can be there. However, this PR introduces a mixture of representation `s""` and `code""`. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createUnsafeAr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21912 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210282476 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala --- @@ -115,6 +115,8 @@ protected[sql] abstract class AtomicType extends DataType { private[sql] type InternalType private[sql] val tag: TypeTag[InternalType] private[sql] val ordering: Ordering[InternalType] + + private[spark] override def supportsEquals: Boolean = true --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22103: [SPARK-25113][SQL] Add logging to CodeGenerator w...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22103#discussion_r210152479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1385,9 +1386,15 @@ object CodeGenerator extends Logging { try { val cf = new ClassFile(new ByteArrayInputStream(classBytes)) val stats = cf.methodInfos.asScala.flatMap { method => - method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a => + method.getAttributes().filter(_.getClass eq codeAttr).map { a => --- End diff -- Yeah, I know The current comparison is more strict. Although the previous comparison was only for name, the current comparison is for a pair of class loader and name. I worried whether the strictness may change behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22105: [SPARK-25115] [Core] Eliminate extra memory copy ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22105#discussion_r210056504 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java --- @@ -140,8 +140,24 @@ private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOExcept // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance // for the case that the passed-in buffer has too many components. int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT); -ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length); -int written = target.write(buffer); +// If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...) +// to eliminate extra memory copies. +int written = 0; +if (buf.nioBufferCount() == 1) { + ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length); + written = target.write(buffer); +} else { + ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length); + for (ByteBuffer buffer: buffers) { +int remaining = buffer.remaining(); +int w = target.write(buffer); +written += w; --- End diff -- Can we guarantee `written` does not overflow while we accumulate `int` values? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22101 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22103: [SPARK-25113][SQL] Add logging to CodeGenerator w...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22103#discussion_r209995159 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1385,9 +1386,15 @@ object CodeGenerator extends Logging { try { val cf = new ClassFile(new ByteArrayInputStream(classBytes)) val stats = cf.methodInfos.asScala.flatMap { method => - method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a => + method.getAttributes().filter(_.getClass eq codeAttr).map { a => --- End diff -- Why do we need change this condition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22103: [SPARK-25113][SQL] Add logging to CodeGenerator w...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22103#discussion_r209993118 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1385,9 +1386,15 @@ object CodeGenerator extends Logging { try { val cf = new ClassFile(new ByteArrayInputStream(classBytes)) val stats = cf.methodInfos.asScala.flatMap { method => - method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a => + method.getAttributes().filter(_.getClass eq codeAttr).map { a => val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize) + +if (byteCodeSize > DEFAULT_JVM_HUGE_METHOD_LIMIT) { + logInfo("Generated method too long to be JIT compiled: " + --- End diff -- nit: `Generated method is too long ...`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19222 ping @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createU...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21912#discussion_r209878827 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -735,70 +735,98 @@ class CodegenContext { } /** - * Generates code creating a [[UnsafeArrayData]]. + * Generates code creating a [[UnsafeArrayData]] or [[GenericArrayData]] based on + * given parameters. * * @param arrayName name of the array to create + * @param elementType data type of the elements in source array * @param numElements code representing the number of elements the array should contain - * @param elementType data type of the elements in the array * @param additionalErrorMessage string to include in the error message + * @param elementSize optional value which shows the size of an element of the allocated + *[[UnsafeArrayData]] or [[GenericArrayData]] + * + * @return code representing the allocation of [[ArrayData]] + * code representing a setter of an assignment for the generated array */ - def createUnsafeArray( + def createArrayData( --- End diff -- I think that it would be good to have a utility method to generate this call. WDYT? On the other hand, I agree that we have a method `setArrayElementFunc` or `setArrayElement`. Thus, we can split this method into two methods that have only one return value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createU...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21912#discussion_r209877426 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayData.scala --- @@ -34,6 +36,37 @@ object ArrayData { case a: Array[Double] => UnsafeArrayData.fromPrimitiveArray(a) case other => new GenericArrayData(other) } + + + /** + * Allocate [[UnsafeArrayData]] or [[GenericArrayData]] based on given parameters. + * + * @param elementSize a size of an element in bytes + * @param numElements the number of elements the array should contain + * @param isPrimitiveType whether the type of an element is primitive type + * @param additionalErrorMessage string to include in the error message + */ + def allocateArrayData( + elementSize: Int, + numElements : Long, + isPrimitiveType: Boolean, + additionalErrorMessage: String) : ArrayData = { +val arraySize = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(numElements, elementSize) +if (isPrimitiveType && !UnsafeArrayData.shouldUseGenericArrayData(elementSize, numElements)) { --- End diff -- When `UnsafeArrayData` can be used, `GenericArrayData` is also used. However, if the element size is large, `GenericArrayData` should be used. But, `UnsafeArrayData` cannot be used. Thus, I think that it would be good to use the current name `shouldUseGenericArrayData`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22101#discussion_r209863964 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -42,16 +42,16 @@ public int compare( while ((leftOff + i) % 8 != 0 && i < leftLen) { res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +if (res != 0) return (int) res; i += 1; } } // for architectures that support unaligned accesses, chew it up 8 bytes at a time if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { -res = (int) ((Platform.getLong(leftObj, leftOff + i) - -Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); -if (res != 0) return res; +res = Platform.getLong(leftObj, leftOff + i) - +Platform.getLong(rightObj, rightOff + i); +if (res != 0) return res > 0 ? 1 : -1; --- End diff -- +1 for no subtraction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22101#discussion_r209855589 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -60,7 +60,7 @@ public int compare( while (i < leftLen) { res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); - if (res != 0) return res; + if (res != 0) return (int) res; --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22101#discussion_r209855528 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -42,16 +42,16 @@ public int compare( while ((leftOff + i) % 8 != 0 && i < leftLen) { res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +if (res != 0) return (int) res; --- End diff -- How about the following change to minimize and localize the change? ``` int res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); if (res != 0) return res; ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22101#discussion_r209855135 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -42,16 +42,16 @@ public int compare( while ((leftOff + i) % 8 != 0 && i < leftLen) { res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +if (res != 0) return (int) res; i += 1; } } // for architectures that support unaligned accesses, chew it up 8 bytes at a time if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { -res = (int) ((Platform.getLong(leftObj, leftOff + i) - -Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); -if (res != 0) return res; +res = Platform.getLong(leftObj, leftOff + i) - +Platform.getLong(rightObj, rightOff + i); +if (res != 0) return res > 0 ? 1 : -1; --- End diff -- How about the following change to minimize and localize the change? ``` long res = Platform.getLong(leftObj, leftOff + i) - Platform.getLong(rightObj, rightOff + i); if (res != 0) return res > 0 ? 1 : -1; ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22001 Just curious. It is very interesting to me since the recent three tries consistently cause a timeout failure at the same test. https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94687 https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94705 https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94716 In addition, other PRs look successful without timeout. ``` [info] - abort the job if total size of results is too large (1 second, 122 milliseconds) Exception in thread "task-result-getter-3" java.lang.Error: java.lang.InterruptedException at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220) at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115) at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:701) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1932) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ... 2 more ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createUnsafeAr...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21912 cc @ueshin @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22048: Fix the show method to display the wide character alignm...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22048 @xuejianbest Could you please create a JIRA entry and add test cases to this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r209692496 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -43,25 +43,29 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => false } - // TODO: if the nullability of field is correct, we can use it to save null check. private def writeStructToBuffer( ctx: CodegenContext, input: String, index: String, - fieldTypes: Seq[DataType], + fieldTypeAndNullables: Seq[(DataType, Boolean)], --- End diff -- @cloud-fan I found this case class `case class Schema(dataType: DataType, nullable: Boolean)` at two places. 1. `ScalaReflection.Schema` 1. `SchemaConverters.SchemaType` Do we use one of them? Or do we define `org.apache.spark.sql.types.Schema`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21912: [SPARK-24962][SQL] Refactor CodeGenerator.createU...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21912#discussion_r209643599 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -735,70 +735,100 @@ class CodegenContext { } /** - * Generates code creating a [[UnsafeArrayData]]. + * Generates code creating a [[UnsafeArrayData]] or [[GenericArrayData]] based on + * given parameters. * * @param arrayName name of the array to create + * @param elementType data type of the elements in source array * @param numElements code representing the number of elements the array should contain - * @param elementType data type of the elements in the array * @param additionalErrorMessage string to include in the error message + * @param elementSize optional value which shows the size of an element of the allocated + *[[UnsafeArrayData]] or [[GenericArrayData]] + * + * @return code representing the allocation of [[ArrayData]] + * code representing a setter of an assignment for the generated array */ - def createUnsafeArray( + def createArrayData( arrayName: String, - numElements: String, elementType: DataType, - additionalErrorMessage: String): String = { -val arraySize = freshName("size") -val arrayBytes = freshName("arrayBytes") + numElements: String, + additionalErrorMessage: String, + elementSize: Option[Int] = None): (String, String) = { --- End diff -- Yeah, they are exclusive. However, when `elementType` is not used, the integer value is required. `MapEntries` stores `UnsafeRow` instead of primitive data. [This](https://github.com/apache/spark/blob/c15630014c7ef850fa21c1247fa5edd2b1bac81b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L405) requires non-fixed size unlike `elementType.defaultSize`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22083: [SQL][Test][Minor] Add missing codes to ParquetCo...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/22083 [SQL][Test][Minor] Add missing codes to ParquetCompressionCodecPrecedenceSuite ## What changes were proposed in this pull request? This PR adds codes to ``"Test `spark.sql.parquet.compression.codec` config"` in `ParquetCompressionCodecPrecedenceSuite`. ## How was this patch tested? Existing UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark ParquetCompressionCodecPrecedenceSuite Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22083 commit 974224b495fc8155b9a6a73673ea478ccdb164d6 Author: Kazuaki Ishizaki Date: 2018-08-12T07:08:42Z update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22082: [SPARK-24420][Build][FOLLOW-UP] Upgrade ASM6 APIs
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22082 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22007: [SPARK-25033] Bump Apache commons.{httpclient, httpcore}
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22007 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22007: [SPARK-25033] Bump Apache commons.{httpclient, httpcore}
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22007 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r209416053 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -43,25 +43,29 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => false } - // TODO: if the nullability of field is correct, we can use it to save null check. private def writeStructToBuffer( ctx: CodegenContext, input: String, index: String, - fieldTypes: Seq[DataType], + fieldTypeAndNullables: Seq[(DataType, Boolean)], --- End diff -- @cloud-fan What name of the case class do you suggest? `DataTypeNullable`, or others? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22068: [MINOR][DOC]Add missing compression codec .
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22068 Thanks. BTW, I found another instance in test, not in doc. Do we address this in this PR? Or, do we address in another PR? @HyukjinKwon WDYT ? ``` class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSQLContext { test("Test `spark.sql.parquet.compression.codec` config") { Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c => withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { val expected = if (c == "NONE") "UNCOMPRESSED" else c val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) assert(option.compressionCodecClassName == expected) } } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22016: Fix typos
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22016 #22070 addresses more typo. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22070: Fix typos detected by github.com/client9/misspell
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22070 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22067: [SPARK-25084][SQL] distribute by on multiple columns may...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22067 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22069: [MINOR][DOC] Fix Java example code in Column's comments
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22069 Do we need to update the following similar examples, too? Column.scala ``` * {{{ * // Example: encoding gender string column into integer. * * // Scala: * people.select(when(people("gender") === "male", 0) * .when(people("gender") === "female", 1) * .otherwise(2)) * * // Java: * people.select(when(col("gender").equalTo("male"), 0) * .when(col("gender").equalTo("female"), 1) * .otherwise(2)) * }}} ``` functions.scala ``` * {{{ * // Example: encoding gender string column into integer. * * // Scala: * people.select(when(people("gender") === "male", 0) * .when(people("gender") === "female", 1) * .otherwise(2)) * * // Java: * people.select(when(col("gender").equalTo("male"), 0) * .when(col("gender").equalTo("female"), 1) * .otherwise(2)) * }}} ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22068: [MINOR][DOC]Add missing compression codec .
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22068 Would it be better to update a comment in `DataFrameWriter.scala`, too? ``` * `compression` (default is the value specified in `spark.sql.parquet.compression.codec`): * compression codec to use when saving to file. This can be one of the known case-insensitive * shorten names(`none`, `snappy`, `gzip`, and `lzo`). This will override * `spark.sql.parquet.compression.codec`. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r209180525 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -43,25 +43,29 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro case _ => false } - // TODO: if the nullability of field is correct, we can use it to save null check. private def writeStructToBuffer( ctx: CodegenContext, input: String, index: String, - fieldTypes: Seq[DataType], + fieldTypeAndNullables: Seq[(DataType, Boolean)], --- End diff -- I think that it would be good since it is used at `JavaTypeInference` and `higherOrderFunctions`. cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20637: [SPARK-23466][SQL] Remove redundant null checks i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20637#discussion_r209178573 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala --- @@ -170,6 +174,23 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro val element = CodeGenerator.getValue(tmpInput, et, index) +val primitiveTypeName = if (CodeGenerator.isPrimitiveType(jt)) { --- End diff -- good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21994: [SPARK-24529][Build][test-maven][follow-up] Add s...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21994#discussion_r209147728 --- Diff: pom.xml --- @@ -2609,6 +2609,28 @@ + +com.github.spotbugs +spotbugs-maven-plugin --- End diff -- let me check the elapsed time on my environment. +1 for holding on for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 I think that this is not a data correctness issue. This may cause unexpected program abort due to hardware memory access error. BTW, it would be good to backport it to increase stability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20637: [SPARK-23466][SQL] Remove redundant null checks in gener...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20637 The failure of `org.apache.spark.sql.catalyst.expressions.JsonExpressionsSuite.from_json missing fields` is due to passing `null` while the schema has `nullable=false`. This inconsistency is agreed in the discussion at [SPARK-23173](https://issues.apache.org/jira/browse/SPARK-23173). `Assume that each field in schema passed to from_json is nullable, and ignore the nullability information set in the passed schema.` When `spark.sql.fromJsonForceNullableSchema=false`, I think that a test is invalid to pass `nullable=false` in the corresponding schema to the missing field. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22059: [SPARK-25036][SQL] Avoid discarding unmoored doc comment...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22059 cc @srowen @ueshin @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22059: [SPARK-25036][SQL] Avoid discarding unmoored doc comment...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22059 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22058: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exh...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22058 I think that this is the last one with the following command. But, I would like to confirm this with @ueshin. `build/sbt -Pscala-2.12 -Phadoop-2.6 -Pkubernetes -Phive -Pmesos -Phive-thriftserver -Pflume -Pkinesis-asl -Pyarn package` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22053: [SPARK-25069][CORE]Using UnsafeAlignedOffset to make the...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22053 Good catch LGTM with a comment: Would it be better to update comments regarding `4 bytes` with `4 or 8 bytes` in `UnsafeExternalSorter.java`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22055: [MINOR][BUILD] Update Jetty to 9.3.24.v20180605
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22055 LGTM These changes are not huge. It looks non-risky. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22059: [SPARK-25036][SQL] Avoid discarding unmoored doc ...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/22059 [SPARK-25036][SQL] Avoid discarding unmoored doc comment in Scala-2.12. ## What changes were proposed in this pull request? This PR avoid the following compilation error using sbt in Scala-2.12. ``` [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala:410: discarding unmoored doc comment [error] [warn] /** [error] [warn] [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala:441: discarding unmoored doc comment [error] [warn] /** [error] [warn] ... [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:440: discarding unmoored doc comment [error] [warn] /** [error] [warn] ``` ## How was this patch tested? Existing UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-25036d Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22059.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22059 commit 70ec11e5b85100b0b10257f7c0846b63af1ba494 Author: Kazuaki Ishizaki Date: 2018-08-09T17:11:30Z initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22058: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exh...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22058 cc @ueshin @srowen @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22058: [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/22058 [SPARK-25036][SQL][FOLLOW-UP] Avoid match may not be exhaustive in Scala-2.12. ## What changes were proposed in this pull request? This is a follow-up pr of #22014 and #22039 We still have some more compilation errors in mllib with scala-2.12 with sbt: ``` [error] [warn] /home/ishizaki/Spark/PR/scala212/spark/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala:116: match may not be exhaustive. [error] It would fail on the following inputs: ("silhouette", _), (_, "cosine"), (_, "squaredEuclidean"), (_, String()), (_, _) [error] [warn] ($(metricName), $(distanceMeasure)) match { [error] [warn] ``` ## How was this patch tested? Existing UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-25036c Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22058 commit 6a0ee38c53d8a53d219bfec8cad9953bc9571e0c Author: Kazuaki Ishizaki Date: 2018-08-09T16:29:40Z initial commit commit 5655d83bd2c9d6ca872c371bff421f69409b6d0b Author: Kazuaki Ishizaki Date: 2018-08-09T16:45:23Z make the change one-liner --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20184: [SPARK-22987][Core] UnsafeExternalSorter cases OO...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20184#discussion_r208993136 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -116,13 +138,18 @@ public void loadNext() throws IOException { if (taskContext != null) { taskContext.killTaskIfInterrupted(); } -recordLength = din.readInt(); -keyPrefix = din.readLong(); -if (recordLength > arr.length) { - arr = new byte[recordLength]; +// check if the reader is closed to prevent reopen the in and din. +if (!hasNext()) { + throw new IndexOutOfBoundsException("Can not load next item when UnsafeSorterSpillReader is closed."); +} +recordLength = getDin().readInt(); +keyPrefix = getDin().readLong(); +int arrLength = Math.max(1024 * 1024, recordLength); +if (arrLength > arr.length) { + arr = new byte[arrLength]; baseObject = arr; } -ByteStreams.readFully(in, arr, 0, recordLength); +ByteStreams.readFully(getIn(), arr, 0, recordLength); --- End diff -- Is it fine if `recordLength` is greater than `1024 * 1024`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208950122 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -929,6 +955,28 @@ class DAGScheduler( // HadoopRDD whose underlying HDFS files have been deleted. finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { + case e: Exception if e.getMessage == + DAGScheduler.ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER => +logWarning("The job requires to run a barrier stage that requires more slots than the " + + "total number of slots in the cluster currently.") +jobIdToNumTasksCheckFailures.putIfAbsent(jobId, 0) +val numCheckFailures = jobIdToNumTasksCheckFailures.get(jobId) + 1 --- End diff -- Is it OK while this increment is not atomic? In the following scenario, the value may not be correct 1. We assume `jobIdToNumTasksCheckFailures(jobId) = 1` 1. Thread A executes L963, then `numCheckFailures = 2` 1. Thread B executes L963, then `numCheckFailures = 2` 1. Thread B executes L964 and L965, then `jobIdToNumTasksCheckFailures(jobId)` has 2. 1. Thread A executes L964 and L965, then `jobIdToNumTasksCheckFailures(jobId)` has 2. Since two threads detected failure, we expect `3`. But, it is `2`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208947201 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -203,6 +203,17 @@ class DAGScheduler( sc.getConf.getInt("spark.stage.maxConsecutiveAttempts", DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS) + /** + * Number of max concurrent tasks check failures for each job. + */ + private[scheduler] val jobIdToNumTasksCheckFailures = new ConcurrentHashMap[Int, Int] + + /** + * Time in seconds to wait between a max concurrent tasks check failure and the next check. --- End diff -- nit: `a max` -> `max`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208946523 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -577,4 +577,17 @@ package object config { .timeConf(TimeUnit.SECONDS) .checkValue(v => v > 0, "The value should be a positive time value.") .createWithDefaultString("365d") + + private[spark] val BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL = + ConfigBuilder("spark.scheduler.barrier.maxConcurrentTasksCheck.interval") + .doc("Time in seconds to wait between a max concurrent tasks check failure and the next " + --- End diff -- nit: `a max` -> `max`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r208945843 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1602,6 +1602,15 @@ class SparkContext(config: SparkConf) extends Logging { } } + /** + * Get the max number of tasks that can be concurrent launched currently. --- End diff -- How about like this? ``` * Get the max number of tasks that can be concurrently launched when the method is called. * Note that please don't cache the value returned by this method, because the number can be * changed due to adding/removing executors. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21859: [SPARK-24900][SQL]Speed up sort when the dataset is smal...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21859 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22055: [MINOR][BUILD] Update Jetty to 9.3.24.v20180605
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22055 Release notes [9.3.21](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.3.21.v20170918) [9.3.22](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.3.22.v20171030) [9.3.23](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.3.23.v20180228) [9.3.24](https://github.com/eclipse/jetty.project/releases/tag/jetty-9.3.24.v20180605) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21505 gentle ping @ssonker --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22044: [SPARK-23912][SQL][Followup] Refactor ArrayDistin...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22044#discussion_r208862917 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3410,6 +3410,28 @@ case class ArrayDistinct(child: Expression) case _ => false } + @transient protected lazy val canUseSpecializedHashSet = elementType match { --- End diff -- We can do so. To minimize the changes due to remaining time for cutting, I would like to do this in another PR #21912. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22044: [SPARK-23912][SQL][Followup] Refactor ArrayDistinct
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/22044 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21993: [SPARK-24983][Catalyst] Add configuration for max...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21993#discussion_r208822706 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -631,19 +631,26 @@ object ColumnPruning extends Rule[LogicalPlan] { object CollapseProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case p1 @ Project(_, p2: Project) => - if (haveCommonNonDeterministicOutput(p1.projectList, p2.projectList)) { -p1 - } else { -p2.copy(projectList = buildCleanedProjectList(p1.projectList, p2.projectList)) - } +case p1@Project(_, p2: Project) => --- End diff -- nit: Do we need to change this line? We can keep this line as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org