[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21299 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21299 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3188/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21290: [SPARK-24241][Submit]Do not fail fast when dynami...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21290#discussion_r187847736 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -76,6 +75,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var proxyUser: String = null var principal: String = null var keytab: String = null + var dynamicAllocationEnabled: String = null --- End diff -- Add `private` since this variable is never used out of this class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21290: [SPARK-24241][Submit]Do not fail fast when dynamic resou...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21290 LGTM, just some minor comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21290: [SPARK-24241][Submit]Do not fail fast when dynami...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21290#discussion_r187847656 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -180,6 +180,25 @@ class SparkSubmitSuite appArgs.toString should include ("thequeue") } + test("SPARK-24241: not fail fast when executor num is 0 and dynamic allocation enabled") { --- End diff -- nit: "do not fail fast x when dynamic allocation is enabled" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187861317 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) - val newPage = new Array[Long](used * 2) + val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) + ensureAcquireMemory(used * 8L * multiples) --- End diff -- How about shaping up this logic along with the other similar ones (spliting this func into two parts: `grow`/`append`)? e.g., `UTF8StringBuilder` https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java#L43 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21319: [SPARK-24267][SQL] explicitly keep DataSourceReader in D...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21319 cc @rdblue @gatorsmile @gengliangwang --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21319: [SPARK-24267][SQL] explicitly keep DataSourceReader in D...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21319 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3193/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21319: [SPARK-24267][SQL] explicitly keep DataSourceReader in D...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21319 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19293: [SPARK-22079][SQL] Serializer in HiveOutputWriter miss l...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19293 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187900467 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering") { --- End diff -- ordering and partitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21299 **[Test build #90568 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90568/testReport)** for PR 21299 at commit [`a100dea`](https://github.com/apache/spark/commit/a100dea9573e9b43b993516c817e306d80f72d29). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21318: [minor] Update docs for functions.scala to make it clear...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21318 @rxin, should we maybe we mention that SQL functions are usually added to match other DBMSs (unlike functions.scala)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21312: [SPARK-24259][SQL] ArrayWriter for Arrow produces wrong ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21312 Thanks for catching this @viirya! Looks good from a first glance, but my only concern is that `clear()` will release the vector buffers, where `reset()` just zeros them out. Let me look into that a little further tomorrow and make sure it won't cause a problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21299 **[Test build #90568 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90568/testReport)** for PR 21299 at commit [`a100dea`](https://github.com/apache/spark/commit/a100dea9573e9b43b993516c817e306d80f72d29). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21290: [SPARK-24241][Submit]Do not fail fast when dynami...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21290#discussion_r187852160 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -76,6 +75,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var proxyUser: String = null var principal: String = null var keytab: String = null + var dynamicAllocationEnabled: String = null --- End diff -- Yes, I think you can remove that variable check in UT, seems not so necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21299 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21299 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90568/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187857950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) - val newPage = new Array[Long](used * 2) + val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) + ensureAcquireMemory(used * 8L * multiples) --- End diff -- Do we move the size check into before the allocation? IIUC, we have to check `used * multiplies` <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH` now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187858372 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) +case class ArrayRepeat(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType) + + override def nullable: Boolean = right.nullable + + override def eval(input: InternalRow): Any = { +val count = right.eval(input) +if (count == null) { + null +} else { + if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to create array with $count elements" + + s"due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + } + val element = left.eval(input) + new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element)) +} + } + + override def prettyName: String = "array_repeat" + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) +val element = leftGen.value +val count = rightGen.value +val et = dataType.elementType + +val coreLogic = if (CodeGenerator.isPrimitiveType(et)) { + genCodeForPrimitiveElement(ctx, et, element, count, leftGen.isNull, ev.value) +} else { + genCodeForNonPrimitiveElement(ctx, element, count, leftGen.isNull, ev.value) +} +val resultCode = nullElementsProtection(ev, rightGen.isNull, coreLogic) + +ev.copy(code = + s""" + |boolean ${ev.isNull} = false; + |${leftGen.code} + |${rightGen.code} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |$resultCode + """.stripMargin) + } + + private def nullElementsProtection(ev: ExprCode, + rightIsNull: String, + coreLogic: String): String = { +if (nullable) { + s""" + |if ($rightIsNull) { + | ${ev.isNull} = true; + |} else { + | ${coreLogic} + |} + """.stripMargin +} else { + coreLogic +} + } + + private def genCodeForNumberOfElements(ctx: CodegenContext, count: String): (String, String) = { +val numElements = ctx.freshName("numElements") +val numElementsCode = + s""" + |int $numElements = 0; + |if ($count > 0) { + | $numElements = $count; + |} + |if ($numElements > $MAX_ARRAY_LENGTH) { + | throw new RuntimeException("Unsuccessful try to create array with " + $numElements + + |" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + |} + """.stripMargin + +(numElements, numElementsCode) + } + + private def genCodeForPrimitiveElement(ctx: CodegenContext, + elementType: DataType, + element: String, + count: String, + leftIsNull: String, + arrayDataName: String): String = { --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187858935 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -843,6 +843,82 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { } } + test("array_repeat function") { +val dummyFilter = (c: Column) => c.isNull || c.isNotNull // to switch codeGen on +val strDF = Seq( +("hi", 2), +(null, 2) --- End diff -- nit: indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187858398 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) +case class ArrayRepeat(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType) + + override def nullable: Boolean = right.nullable + + override def eval(input: InternalRow): Any = { +val count = right.eval(input) +if (count == null) { + null +} else { + if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to create array with $count elements" + + s"due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + } + val element = left.eval(input) + new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element)) +} + } + + override def prettyName: String = "array_repeat" + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) +val element = leftGen.value +val count = rightGen.value +val et = dataType.elementType + +val coreLogic = if (CodeGenerator.isPrimitiveType(et)) { + genCodeForPrimitiveElement(ctx, et, element, count, leftGen.isNull, ev.value) +} else { + genCodeForNonPrimitiveElement(ctx, element, count, leftGen.isNull, ev.value) +} +val resultCode = nullElementsProtection(ev, rightGen.isNull, coreLogic) + +ev.copy(code = + s""" + |boolean ${ev.isNull} = false; + |${leftGen.code} + |${rightGen.code} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |$resultCode + """.stripMargin) + } + + private def nullElementsProtection(ev: ExprCode, + rightIsNull: String, + coreLogic: String): String = { +if (nullable) { + s""" + |if ($rightIsNull) { + | ${ev.isNull} = true; + |} else { + | ${coreLogic} + |} + """.stripMargin +} else { + coreLogic +} + } + + private def genCodeForNumberOfElements(ctx: CodegenContext, count: String): (String, String) = { +val numElements = ctx.freshName("numElements") +val numElementsCode = + s""" + |int $numElements = 0; + |if ($count > 0) { + | $numElements = $count; + |} + |if ($numElements > $MAX_ARRAY_LENGTH) { + | throw new RuntimeException("Unsuccessful try to create array with " + $numElements + + |" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + |} + """.stripMargin + +(numElements, numElementsCode) + } + + private def genCodeForPrimitiveElement(ctx: CodegenContext, + elementType: DataType, + element: String, + count: String, + leftIsNull: String, + arrayDataName: String): String = { + +val tempArrayDataName = ctx.freshName("tempArrayData") +val primitiveValueTypeName = CodeGenerator.primitiveTypeName(elementType) +val (numElemName, numElemCode) = genCodeForNumberOfElements(ctx, count) + +s""" +|$numElemCode +|${ctx.createUnsafeArray(tempArrayDataName, numElemName, elementType, s" $prettyName failed.")} +|if (!$leftIsNull) { +| for (int k = 0; k < $tempArrayDataName.numElements(); k++) { +|$tempArrayDataName.set$primitiveValueTypeName(k, $element); +| } +|} else { +| for (int k = 0; k < $tempArrayDataName.numElements(); k++) { +|
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21266 **[Test build #90572 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90572/testReport)** for PR 21266 at commit [`fc96adb`](https://github.com/apache/spark/commit/fc96adb099380ffac09d445461435b613e08f9f3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmark bench...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21288 **[Test build #90571 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90571/testReport)** for PR 21288 at commit [`4520044`](https://github.com/apache/spark/commit/4520044d3be40ba8bf963a151db2dd9769c0f59a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user mallman commented on the issue: https://github.com/apache/spark/pull/21320 @gatorsmile I believe this is the PR you requested for review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21290: [SPARK-24241][Submit]Do not fail fast when dynami...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21290#discussion_r187852287 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -180,6 +180,25 @@ class SparkSubmitSuite appArgs.toString should include ("thequeue") } + test("SPARK-24241: not fail fast when executor num is 0 and dynamic allocation enabled") { +val clArgs1 = Seq( + "--name", "myApp", + "--class", "Foo", + "--num-executors", "0", + "--conf", "spark.dynamicAllocation.enabled=true", + "thejar.jar") +val appArgs = new SparkSubmitArguments(clArgs1) +appArgs.dynamicAllocationEnabled should be ("true") + +val clArgs2 = Seq( + "--name", "myApp", + "--class", "Foo", + "--num-executors", "0", + "--conf", "spark.dynamicAllocation.enabled=false", + "thejar.jar") +intercept[SparkException](new SparkSubmitArguments(clArgs2)) --- End diff -- Also please add a exception message check for `intercept`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21311 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21290: [SPARK-24241][Submit]Do not fail fast when dynamic resou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21290 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187856418 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) +case class ArrayRepeat(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType) + + override def nullable: Boolean = right.nullable + + override def eval(input: InternalRow): Any = { +val count = right.eval(input) +if (count == null) { + null +} else { + if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to create array with $count elements" + --- End diff -- nit: need a space after `elements`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187856635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) +case class ArrayRepeat(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType) + + override def nullable: Boolean = right.nullable + + override def eval(input: InternalRow): Any = { +val count = right.eval(input) +if (count == null) { + null +} else { + if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to create array with $count elements" + + s"due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + } + val element = left.eval(input) + new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element)) +} + } + + override def prettyName: String = "array_repeat" + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) +val element = leftGen.value +val count = rightGen.value +val et = dataType.elementType + +val coreLogic = if (CodeGenerator.isPrimitiveType(et)) { + genCodeForPrimitiveElement(ctx, et, element, count, leftGen.isNull, ev.value) +} else { + genCodeForNonPrimitiveElement(ctx, element, count, leftGen.isNull, ev.value) +} +val resultCode = nullElementsProtection(ev, rightGen.isNull, coreLogic) + +ev.copy(code = + s""" + |boolean ${ev.isNull} = false; + |${leftGen.code} + |${rightGen.code} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |$resultCode --- End diff -- nit: usually we the following format for codegen: ```scala s""" | | """.stripMargin ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187856067 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) --- End diff -- `since = "2.4.0"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187857608 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) +case class ArrayRepeat(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType) + + override def nullable: Boolean = right.nullable + + override def eval(input: InternalRow): Any = { +val count = right.eval(input) +if (count == null) { + null +} else { + if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to create array with $count elements" + + s"due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + } + val element = left.eval(input) + new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element)) +} + } + + override def prettyName: String = "array_repeat" + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) +val element = leftGen.value +val count = rightGen.value +val et = dataType.elementType + +val coreLogic = if (CodeGenerator.isPrimitiveType(et)) { + genCodeForPrimitiveElement(ctx, et, element, count, leftGen.isNull, ev.value) +} else { + genCodeForNonPrimitiveElement(ctx, element, count, leftGen.isNull, ev.value) +} +val resultCode = nullElementsProtection(ev, rightGen.isNull, coreLogic) + +ev.copy(code = + s""" + |boolean ${ev.isNull} = false; + |${leftGen.code} + |${rightGen.code} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |$resultCode + """.stripMargin) + } + + private def nullElementsProtection(ev: ExprCode, + rightIsNull: String, + coreLogic: String): String = { --- End diff -- nit: style. ```scala private def nullElementsProtection( ev: ExprCode, rightIsNull: String, coreLogic: String): String = { ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20800: [SPARK-23627][SQL] Provide isEmpty in Dataset
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20800#discussion_r187864972 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -511,6 +511,14 @@ class Dataset[T] private[sql]( */ def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] + /** + * Returns true if the `Dataset` is empty. + * + * @group basic + * @since 2.4.0 + */ + def isEmpty: Boolean = rdd.isEmpty() --- End diff -- +1 for the limit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21319: [SPARK-24267][SQL] explicitly keep DataSourceRead...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/21319 [SPARK-24267][SQL] explicitly keep DataSourceReader in DataSourceV2Relation ## What changes were proposed in this pull request? To keep `DataSourceV2Relation` immutable, we don't put the `DataSourceReader` in the constructor, but make it a `lazy val` instead. If we think about how `lazy val` is implemented in Scala, we actually keep a `DataSourceReader` instance in `DataSourceV2Relation`, and exclude it when defining equality of `DataSourceV2Relation`. This works, but have 2 problems: 1. after the pushdown rule, if `DataSourceV2Relation` get transformed and return a new copy, we will re-do the pushdown and re-create the `DataSourceReader`. 2. the pushdown logic is defined in `DataSourceV2Relation` instead of the pushdown rule, which is a little counter-intuitive. This PR proposes to implement the `lazy val` by ourselves: keep `DataSourceReader` as an optional parameter in the constructor of `DataSourceV2Relation`, exclude it in the equality definition but include it when copying. ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21319.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21319 commit ca6ccb24e8f2910a3ffc07a790f2ba7f57e79056 Author: Wenchen FanDate: 2018-04-24T17:26:19Z do not create DataSourceReader many times --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21108: [SPARK-24027][SQL] Support MapType with StringTyp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21108#discussion_r187888913 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala --- @@ -326,4 +326,61 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(errMsg4.getMessage.startsWith( "A type of keys and values in map() must be string, but got")) } + + test("SPARK-24027: from_json - map") { +val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS() +val schema = + """ +|{ +| "type" : "map", +| "keyType" : "string", +| "valueType" : "integer", +| "valueContainsNull" : true +|} + """.stripMargin +val out = in.select(from_json($"value", schema, Map[String, String]())) + +assert(out.columns.head == "entries") +checkAnswer(out, Row(Map("a" -> 1, "b" -> 2, "c" -> 3))) + } + + test("SPARK-24027: from_json - map ") { --- End diff -- I added a negative test. Please, take a look at the PR again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21114: [SPARK-22371][CORE] Return None instead of throwi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21114#discussion_r187891772 --- Diff: core/src/test/scala/org/apache/spark/AccumulatorSuite.scala --- @@ -237,6 +236,65 @@ class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContex acc.merge("kindness") assert(acc.value === "kindness") } + + test("updating garbage collected accumulators") { --- End diff -- what does this test do? prove accumulator can be GCed even it's valid? The map in `AccumulatorContext` is fixed-size so this definitely can happen and we don't need to prove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/21320 [SPARK-4502][SQL] Parquet nested column pruning - foundation (Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) _N.B. This is a restart of PR #16578 which includes everything in that PR except the aggregation and join schema pruning rules. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._ ## What changes were proposed in this pull request? One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct ||-- first: string ||-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. ### Implementation There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. ### Limitation Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. ## How was this patch tested? Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug. We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-4502-parquet_column_pruning-foundation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21320.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21320 commit 9e301b37bb5863ef5fad8ec15f1d8f3d4b5e6a6f Author: Michael AllmanDate: 2016-06-24T17:21:24Z [SPARK-4502][SQL] Parquet nested column pruning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21106: [SPARK-23711][SQL][WIP] Add fallback logic for Un...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21106#discussion_r187897086 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.codehaus.commons.compiler.CompileException +import org.codehaus.janino.InternalCompilerException + +import org.apache.spark.TaskContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * Catches compile error during code generation. + */ +object CodegenError { + def unapply(throwable: Throwable): Option[Exception] = throwable match { +case e: InternalCompilerException => Some(e) +case e: CompileException => Some(e) +case _ => None + } +} + +/** + * Defines values for `SQLConf` config of fallback mode. Use for test only. + */ +object CodegenObjectFactoryMode extends Enumeration { + val AUTO, CODEGEN_ONLY, NO_CODEGEN = Value + + def currentMode: CodegenObjectFactoryMode.Value = { +// If we weren't on task execution, accesses that config. +if (TaskContext.get == null) { + val config = SQLConf.get.getConf(SQLConf.CODEGEN_FACTORY_MODE) + CodegenObjectFactoryMode.withName(config) +} else { + CodegenObjectFactoryMode.AUTO +} + } +} + +/** + * A factory which can be used to create objects that have both codegen and interpreted --- End diff -- the comment also needs update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21199: [SPARK-24127][SS] Continuous text socket source
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21199 I was thinking if it is too overkill to receive data in the driver side and publish them to the executors via RPC? This might give user a wrong impression that data should be received in the driver side and published to the executors again. Just my two cents. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21290: [SPARK-24241][Submit]Do not fail fast when dynami...
Github user yaooqinn commented on a diff in the pull request: https://github.com/apache/spark/pull/21290#discussion_r187851318 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -76,6 +75,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var proxyUser: String = null var principal: String = null var keytab: String = null + var dynamicAllocationEnabled: String = null --- End diff -- used in ut, so should i modify test or keep it `public` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21318: [minor] Update docs for functions.scala to make i...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21318#discussion_r187857692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -39,7 +39,21 @@ import org.apache.spark.util.Utils /** - * Functions available for DataFrame operations. + * Commonly used functions available for DataFrame operations. Using functions defined here provides + * a little bit more compile-time safety to make sure the function exists. + * + * Spark also includes more built-in functions that are less common and are not defined here. + * You can still access them (and all the functions defined here) using the [[functions.expr()]] API + * and calling them through a SQL expression string. You can find the entire list of functions for + * the latest version of Spark at [[https://spark.apache.org/docs/latest/api/sql/index.html]]. + * + * As an example, `isnan` is a function that is defined here. You can use `isnan(col("myCol"))` + * to invoke the isnan function. This way the programming language's compiler ensures isnan exists + * and is of the proper form. You can also use `expr("isnan(myCol)")` function to invoke the same + * function. In this case, Spark itself will ensure isnan exists when it analyzes the query. --- End diff -- nit: `isnan` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187857852 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) - val newPage = new Array[Long](used * 2) + val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) + ensureAcquireMemory(used * 8L * multiples) + val newPage = new Array[Long](used * multiples) --- End diff -- Do we move the size check into before the allocation? IIUC, we have to check `used * multiplies` <= `ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH` now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19789 cc @marmbrus @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21291: [SPARK-24242][SQL] RangeExec should have correct ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21291#discussion_r187900810 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala --- @@ -621,6 +621,25 @@ class PlannerSuite extends SharedSQLContext { requiredOrdering = Seq(orderingA, orderingB), shouldHaveSort = true) } + + test("SPARK-24242: RangeExec should have correct output ordering") { +val df = spark.range(10).orderBy("id") --- End diff -- why do we put an `orderBy` in the query? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21311 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187883931 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) - val newPage = new Array[Long](used * 2) + val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) + ensureAcquireMemory(used * 8L * multiples) --- End diff -- +1 on grow/append --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187884949 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) --- End diff -- Doubling the size when growing is very typical, seems what you want to address is when the memory is enough for the requsted size but not enough for doubling the size. I'd suggest we should double the size most of the time, as long as there is enough memory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21114: [SPARK-22371][CORE] Return None instead of throwing an e...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21114 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman closed the pull request at: https://github.com/apache/spark/pull/16578 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on the issue: https://github.com/apache/spark/pull/16578 I'm closing this PR in favor of #21320. That PR deals with simple projection and filter queries only. I will submit subsequent PRs for aggregation and join queries following the acceptance of #21320. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21106: [SPARK-23711][SQL][WIP] Add fallback logic for Un...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21106#discussion_r187897203 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -87,12 +87,11 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe /** * Helper functions for creating an [[InterpretedUnsafeProjection]]. */ -object InterpretedUnsafeProjection extends UnsafeProjectionCreator { - +object InterpretedUnsafeProjection { /** * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. */ - override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { + protected[sql] def createProjection(exprs: Seq[Expression]): UnsafeProjection = { --- End diff -- Did you change it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21318: [minor] Update docs for functions.scala to make i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21318#discussion_r187843889 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -39,7 +39,21 @@ import org.apache.spark.util.Utils /** - * Functions available for DataFrame operations. + * Commonly used functions available for DataFrame operations. Using functions defined here provides + * a little bit more compile-time safety to make sure the function exists. + * + * Spark also includes more built-in functions that are less common and are not defined here. + * You can still access them (and all the functions defined here) using the [[functions.expr()]] API + * and calling them through a SQL expression string. You can find the entire list of functions for + * the latest version of Spark at [[https://spark.apache.org/docs/latest/api/sql/index.html]]. + * + * As an example, `isnan` is a function that is defined here. You can use `isnan(col("myCol"))` + * to invoke the isnan function. This way the programming language's compiler ensures isnan exists --- End diff -- nit: `isnan` -> `` `isnan` `` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21236#discussion_r187853879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -118,6 +119,162 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns an unordered array of all entries in the given map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array of all entries in the given map.", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b')); + [(1,"a"),(2,"b")] + """, + since = "2.4.0") +case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType] + + override def dataType: DataType = { +ArrayType( + StructType( +StructField("key", childDataType.keyType, false) :: +StructField("value", childDataType.valueType, childDataType.valueContainsNull) :: +Nil), + false) + } + + override protected def nullSafeEval(input: Any): Any = { +val childMap = input.asInstanceOf[MapData] +val keys = childMap.keyArray() +val values = childMap.valueArray() +val length = childMap.numElements() +val resultData = new Array[AnyRef](length) +var i = 0; +while (i < length) { + val key = keys.get(i, childDataType.keyType) + val value = values.get(i, childDataType.valueType) + val row = new GenericInternalRow(Array[Any](key, value)) + resultData.update(i, row) + i += 1 +} +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + val numElements = ctx.freshName("numElements") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + val isKeyPrimitive = CodeGenerator.isPrimitiveType(childDataType.keyType) + val isValuePrimitive = CodeGenerator.isPrimitiveType(childDataType.valueType) + val code = if (isKeyPrimitive && isValuePrimitive) { +genCodeForPrimitiveElements(ctx, keys, values, ev.value, numElements) + } else { +genCodeForAnyElements(ctx, keys, values, ev.value, numElements) + } + s""" + |final int $numElements = $c.numElements(); + |final ArrayData $keys = $c.keyArray(); + |final ArrayData $values = $c.valueArray(); + |$code + """.stripMargin +}) + } + + private def getKey(varName: String) = CodeGenerator.getValue(varName, childDataType.keyType, "z") + + private def getValue(varName: String) = { +CodeGenerator.getValue(varName, childDataType.valueType, "z") + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + keys: String, + values: String, + arrayData: String, + numElements: String): String = { +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val structSize = UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2 +val structSizeAsLong = structSize + "L" +val keyTypeName = CodeGenerator.primitiveTypeName(childDataType.keyType) +val valueTypeName = CodeGenerator.primitiveTypeName(childDataType.keyType) + +val valueAssignment = s"$unsafeRow.set$valueTypeName(1, ${getValue(values)});" +val valueAssignmentChecked = if (childDataType.valueContainsNull) { + s""" + |if ($values.isNullAt(z)) { + | $unsafeRow.setNullAt(1); + |} else { + | $valueAssignment + |} + """.stripMargin +} else { + valueAssignment +} + +s""" + |final long $byteArraySize = $calculateArraySize($numElements, ${longSize + structSize}); + |if ($byteArraySize > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { + | ${genCodeForAnyElements(ctx, keys, values, arrayData, numElements)} + |} else { +
[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21236#discussion_r187852992 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -118,6 +119,161 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns an unordered array of all entries in the given map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array of all entries in the given map.", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b')); + [(1,"a"),(2,"b")] + """, + since = "2.4.0") +case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType] + + override def dataType: DataType = { +ArrayType( + StructType( +StructField("key", childDataType.keyType, false) :: +StructField("value", childDataType.valueType, childDataType.valueContainsNull) :: +Nil), + false) + } + + override protected def nullSafeEval(input: Any): Any = { +val childMap = input.asInstanceOf[MapData] +val keys = childMap.keyArray() +val values = childMap.valueArray() +val length = childMap.numElements() +val resultData = new Array[AnyRef](length) +var i = 0; +while (i < length) { + val key = keys.get(i, childDataType.keyType) + val value = values.get(i, childDataType.valueType) + val row = new GenericInternalRow(Array[Any](key, value)) + resultData.update(i, row) + i += 1 +} +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + val numElements = ctx.freshName("numElements") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + val isKeyPrimitive = CodeGenerator.isPrimitiveType(childDataType.keyType) + val isValuePrimitive = CodeGenerator.isPrimitiveType(childDataType.valueType) + val code = if (isKeyPrimitive && isValuePrimitive) { +genCodeForPrimitiveElements(ctx, keys, values, ev.value, numElements) + } else { +genCodeForAnyElements(ctx, keys, values, ev.value, numElements) + } + s""" + |final int $numElements = $c.numElements(); + |final ArrayData $keys = $c.keyArray(); + |final ArrayData $values = $c.valueArray(); + |$code + """.stripMargin +}) + } + + private def getKey(varName: String) = CodeGenerator.getValue(varName, childDataType.keyType, "z") + + private def getValue(varName: String) = { +CodeGenerator.getValue(varName, childDataType.valueType, "z") + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + keys: String, + values: String, + arrayData: String, + numElements: String): String = { +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val structSize = UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2 --- End diff -- This is not for the element size of arrays. I agree with @kiszk to use `8`. Maybe we need to add a constant to represent the word size in `UnsafeRow` or somewhere in the future pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21236#discussion_r187854341 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -118,6 +119,162 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns an unordered array of all entries in the given map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array of all entries in the given map.", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b')); + [(1,"a"),(2,"b")] + """, + since = "2.4.0") +case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType] + + override def dataType: DataType = { +ArrayType( + StructType( +StructField("key", childDataType.keyType, false) :: +StructField("value", childDataType.valueType, childDataType.valueContainsNull) :: +Nil), + false) + } + + override protected def nullSafeEval(input: Any): Any = { +val childMap = input.asInstanceOf[MapData] +val keys = childMap.keyArray() +val values = childMap.valueArray() +val length = childMap.numElements() +val resultData = new Array[AnyRef](length) +var i = 0; +while (i < length) { + val key = keys.get(i, childDataType.keyType) + val value = values.get(i, childDataType.valueType) + val row = new GenericInternalRow(Array[Any](key, value)) + resultData.update(i, row) + i += 1 +} +new GenericArrayData(resultData) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, c => { + val numElements = ctx.freshName("numElements") + val keys = ctx.freshName("keys") + val values = ctx.freshName("values") + val isKeyPrimitive = CodeGenerator.isPrimitiveType(childDataType.keyType) + val isValuePrimitive = CodeGenerator.isPrimitiveType(childDataType.valueType) + val code = if (isKeyPrimitive && isValuePrimitive) { +genCodeForPrimitiveElements(ctx, keys, values, ev.value, numElements) + } else { +genCodeForAnyElements(ctx, keys, values, ev.value, numElements) + } + s""" + |final int $numElements = $c.numElements(); + |final ArrayData $keys = $c.keyArray(); + |final ArrayData $values = $c.valueArray(); + |$code + """.stripMargin +}) + } + + private def getKey(varName: String) = CodeGenerator.getValue(varName, childDataType.keyType, "z") + + private def getValue(varName: String) = { +CodeGenerator.getValue(varName, childDataType.valueType, "z") + } + + private def genCodeForPrimitiveElements( + ctx: CodegenContext, + keys: String, + values: String, + arrayData: String, + numElements: String): String = { +val byteArraySize = ctx.freshName("byteArraySize") +val data = ctx.freshName("byteArray") +val unsafeRow = ctx.freshName("unsafeRow") +val unsafeArrayData = ctx.freshName("unsafeArrayData") +val structsOffset = ctx.freshName("structsOffset") +val calculateArraySize = "UnsafeArrayData.calculateSizeOfUnderlyingByteArray" +val calculateHeader = "UnsafeArrayData.calculateHeaderPortionInBytes" + +val baseOffset = Platform.BYTE_ARRAY_OFFSET +val longSize = LongType.defaultSize +val structSize = UnsafeRow.calculateBitSetWidthInBytes(2) + longSize * 2 +val structSizeAsLong = structSize + "L" +val keyTypeName = CodeGenerator.primitiveTypeName(childDataType.keyType) +val valueTypeName = CodeGenerator.primitiveTypeName(childDataType.keyType) + +val valueAssignment = s"$unsafeRow.set$valueTypeName(1, ${getValue(values)});" +val valueAssignmentChecked = if (childDataType.valueContainsNull) { + s""" + |if ($values.isNullAt(z)) { + | $unsafeRow.setNullAt(1); + |} else { + | $valueAssignment + |} + """.stripMargin +} else { + valueAssignment +} + +s""" + |final long $byteArraySize = $calculateArraySize($numElements, ${longSize + structSize}); + |if ($byteArraySize > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { + | ${genCodeForAnyElements(ctx, keys, values, arrayData, numElements)} --- End diff --
[GitHub] spark issue #21208: [SPARK-23925][SQL] Add array_repeat collection function
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21208 Btw, can you cleanup your commit history? Including unrelated commits might affect merge script to fail or behave unexpectedly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/21299 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21266 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21290: [SPARK-24241][Submit]Do not fail fast when dynamic resou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21290 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3189/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21299 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21299 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3190/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmark bench...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21288 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3191/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmark bench...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21288 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21266 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3192/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21266 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21290: [SPARK-24241][Submit]Do not fail fast when dynamic resou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21290 **[Test build #90569 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90569/testReport)** for PR 21290 at commit [`17fa3bc`](https://github.com/apache/spark/commit/17fa3bc1011db45c346dac4c5930258810fc28d1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187861750 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") --- End diff -- This is not related to this pr though, `sys.error` instead of `UnsupportedOperationException`? https://github.com/apache/spark/blob/b6c50d7820aafab172835633fb0b35899e93146b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UTF8StringBuilder.java#L45 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21318: [minor] Update docs for functions.scala to make i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21318#discussion_r187843125 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -39,7 +39,21 @@ import org.apache.spark.util.Utils /** - * Functions available for DataFrame operations. + * Commonly used functions available for DataFrame operations. Using functions defined here provides + * a little bit more compile-time safety to make sure the function exists. + * + * Spark also includes more built-in functions that are less common and are not defined here. + * You can still access them (and all the functions defined here) using the [[functions.expr()]] API + * and calling them through a SQL expression string. You can find the entire list of functions for + * the latest version of Spark at [[https://spark.apache.org/docs/latest/api/sql/index.html]]. --- End diff -- @rxin, it's rather a nit but shouldn't we always update the link for each release since it always points the latest? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21318: [minor] Update docs for functions.scala to make i...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21318#discussion_r187843283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -39,7 +39,21 @@ import org.apache.spark.util.Utils /** - * Functions available for DataFrame operations. + * Commonly used functions available for DataFrame operations. Using functions defined here provides --- End diff -- Maybe I am too much caring about this but I hope we don't have arguments too much if it's common or not ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21208: [SPARK-23925][SQL] Add array_repeat collection fu...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21208#discussion_r187872304 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1468,3 +1468,149 @@ case class Flatten(child: Expression) extends UnaryExpression { override def prettyName: String = "flatten" } + +/** + * Returns the array containing the given input value (left) count (right) times. + */ +@ExpressionDescription( + usage = "_FUNC_(element, count) - Returns the array containing element count times.", + examples = """ +Examples: + > SELECT _FUNC_('123', 2); + ['123', '123'] + """) +case class ArrayRepeat(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + private val MAX_ARRAY_LENGTH = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + override def dataType: ArrayType = ArrayType(left.dataType, left.nullable) + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType) + + override def nullable: Boolean = right.nullable + + override def eval(input: InternalRow): Any = { +val count = right.eval(input) +if (count == null) { + null +} else { + if (count.asInstanceOf[Int] > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to create array with $count elements" + + s"due to exceeding the array size limit $MAX_ARRAY_LENGTH."); + } + val element = left.eval(input) + new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element)) +} + } + + override def prettyName: String = "array_repeat" + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val leftGen = left.genCode(ctx) +val rightGen = right.genCode(ctx) +val element = leftGen.value +val count = rightGen.value +val et = dataType.elementType + +val coreLogic = if (CodeGenerator.isPrimitiveType(et)) { + genCodeForPrimitiveElement(ctx, et, element, count, leftGen.isNull, ev.value) +} else { + genCodeForNonPrimitiveElement(ctx, element, count, leftGen.isNull, ev.value) +} +val resultCode = nullElementsProtection(ev, rightGen.isNull, coreLogic) + +ev.copy(code = + s""" + |boolean ${ev.isNull} = false; + |${leftGen.code} + |${rightGen.code} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |$resultCode + """.stripMargin) + } + + private def nullElementsProtection(ev: ExprCode, + rightIsNull: String, --- End diff -- nit: indents --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21173 ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21319: [SPARK-24267][SQL] explicitly keep DataSourceRead...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21319#discussion_r187878368 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -150,111 +127,57 @@ case class StreamingDataSourceV2Relation( } object DataSourceV2Relation { + private implicit class SourceHelpers(source: DataSourceV2) { -def asReadSupport: ReadSupport = { - source match { -case support: ReadSupport => - support -case _: ReadSupportWithSchema => - // this method is only called if there is no user-supplied schema. if there is no - // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. - throw new AnalysisException(s"Data source requires a user-supplied schema: $name") -case _ => - throw new AnalysisException(s"Data source is not readable: $name") - } -} -def asReadSupportWithSchema: ReadSupportWithSchema = { - source match { -case support: ReadSupportWithSchema => - support -case _: ReadSupport => - throw new AnalysisException( -s"Data source does not support user-supplied schema: $name") -case _ => - throw new AnalysisException(s"Data source is not readable: $name") - } +private def asReadSupport: ReadSupport = source match { --- End diff -- Since I'm touching the code around here, I removed the out-most `{ }` and fixed the indentation here. I can revert it if people this is not worth. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21114: [SPARK-22371][CORE] Return None instead of throwing an e...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21114 this behavior change LGTM, but the test is over complicated and seems has limited value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 Kindly ping. I guess debugging last batch might not be attractive that much, but printing codegen would be helpful to someone who want to investigate or debug in detail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21299: [SPARK-24250][SQL] support accessing SQLConf inside task...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21299 **[Test build #90570 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90570/testReport)** for PR 21299 at commit [`a100dea`](https://github.com/apache/spark/commit/a100dea9573e9b43b993516c817e306d80f72d29). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21266 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90572/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21266 **[Test build #90572 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90572/testReport)** for PR 21266 at commit [`fc96adb`](https://github.com/apache/spark/commit/fc96adb099380ffac09d445461435b613e08f9f3). * This patch **fails to generate documentation**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21319: [SPARK-24267][SQL] explicitly keep DataSourceReader in D...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21319 **[Test build #90573 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90573/testReport)** for PR 21319 at commit [`ca6ccb2`](https://github.com/apache/spark/commit/ca6ccb24e8f2910a3ffc07a790f2ba7f57e79056). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21266 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21317: [SPARK-24232][k8s] Add support for secret env vars
Github user skonto commented on the issue: https://github.com/apache/spark/pull/21317 Tests fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21317: [SPARK-24232][k8s] Add support for secret env vars
Github user skonto commented on the issue: https://github.com/apache/spark/pull/21317 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL][WIP] Add fallback logic for UnsafePro...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21106 mostly LGTM, though people may have better ideas about naming. cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21316: [SPARK-20538][SQL] Wrap Dataset.reduce with withN...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21316#discussion_r187899299 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1607,7 +1607,9 @@ class Dataset[T] private[sql]( */ @Experimental @InterfaceStability.Evolving - def reduce(func: (T, T) => T): T = rdd.reduce(func) + def reduce(func: (T, T) => T): T = withNewExecutionId { --- End diff -- @maropu When you asked about this API did you refer to `reduce` or `withNewExecutionId`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21291: [SPARK-24242][SQL] RangeExec should have correct outputO...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21291 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user cxzl25 commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187907410 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") --- End diff -- ok. sys.error instead of UnsupportedOperationException --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user cxzl25 commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187907559 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) - val newPage = new Array[Long](used * 2) + val multiples = math.max(math.ceil(needSize.toDouble / (used * 8L)).toInt, 2) + ensureAcquireMemory(used * 8L * multiples) --- End diff -- ok.Spliting append func into two parts: grow/append. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate th...
Github user cxzl25 commented on a diff in the pull request: https://github.com/apache/spark/pull/21311#discussion_r187907473 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -568,13 +568,16 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } // There is 8 bytes for the pointer to next value -if (cursor + 8 + row.getSizeInBytes > page.length * 8L + Platform.LONG_ARRAY_OFFSET) { +val needSize = cursor + 8 + row.getSizeInBytes +val nowSize = page.length * 8L + Platform.LONG_ARRAY_OFFSET +if (needSize > nowSize) { val used = page.length if (used >= (1 << 30)) { sys.error("Can not build a HashedRelation that is larger than 8G") } - ensureAcquireMemory(used * 8L * 2) --- End diff -- ok . Doubling the size when growing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmark bench...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21288 **[Test build #90571 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90571/testReport)** for PR 21288 at commit [`4520044`](https://github.com/apache/spark/commit/4520044d3be40ba8bf963a151db2dd9769c0f59a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21114: [SPARK-22371][CORE] Return None instead of throwing an e...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21114 **[Test build #90577 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90577/testReport)** for PR 21114 at commit [`8b30733`](https://github.com/apache/spark/commit/8b30733dba85d9881d0171414616bd0b0893f419). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21320: [SPARK-4502][SQL] Parquet nested column pruning - founda...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21320 **[Test build #90582 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90582/testReport)** for PR 21320 at commit [`9e301b3`](https://github.com/apache/spark/commit/9e301b37bb5863ef5fad8ec15f1d8f3d4b5e6a6f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20894 **[Test build #90581 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90581/testReport)** for PR 20894 at commit [`2bd2713`](https://github.com/apache/spark/commit/2bd27136ae9095beec429ff15a6a5f1be0464419). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21266 **[Test build #90579 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90579/testReport)** for PR 21266 at commit [`fc96adb`](https://github.com/apache/spark/commit/fc96adb099380ffac09d445461435b613e08f9f3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21311 **[Test build #90574 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90574/testReport)** for PR 21311 at commit [`22a2767`](https://github.com/apache/spark/commit/22a2767b98185edf32be3c36bb255f5837ad7466). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21165: [Spark-20087][CORE] Attach accumulators / metrics to 'Ta...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21165 **[Test build #90585 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90585/testReport)** for PR 21165 at commit [`05d1d9c`](https://github.com/apache/spark/commit/05d1d9cad761bb09e1131162458fecd5e34f02d2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21317: [SPARK-24232][k8s] Add support for secret env vars
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21317 **[Test build #90576 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90576/testReport)** for PR 21317 at commit [`aa0ccc0`](https://github.com/apache/spark/commit/aa0ccc02bc080b067519834ebb2657dd54208c37). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21311 **[Test build #90575 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90575/testReport)** for PR 21311 at commit [`d9d8e62`](https://github.com/apache/spark/commit/d9d8e62c2de7d9d04534396ab3bbf984ab16c7f5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21317: [SPARK-24232][k8s] Add support for secret env vars
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21317 **[Test build #90578 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90578/testReport)** for PR 21317 at commit [`aa0ccc0`](https://github.com/apache/spark/commit/aa0ccc02bc080b067519834ebb2657dd54208c37). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20933: [SPARK-23817][SQL]Migrate ORC file format read path to d...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20933 **[Test build #90584 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90584/testReport)** for PR 20933 at commit [`67b1748`](https://github.com/apache/spark/commit/67b1748c8b939a6b484bfc868fd311e381d7f8e0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21291: [SPARK-24242][SQL] RangeExec should have correct outputO...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21291 **[Test build #90583 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90583/testReport)** for PR 21291 at commit [`3a14bd6`](https://github.com/apache/spark/commit/3a14bd6eeb390dfe0940eb17b5c9988e12aba1bc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19293: [SPARK-22079][SQL] Serializer in HiveOutputWriter miss l...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19293 **[Test build #90580 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90580/testReport)** for PR 19293 at commit [`45477fb`](https://github.com/apache/spark/commit/45477fbf00558066e3733a34e1d59ce22c192ee2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org