[GitHub] spark pull request #21622: [SPARK-24637][SS] Add metrics regarding state and...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21622#discussion_r200554917 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala --- @@ -39,6 +42,23 @@ class MetricsReporter( registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0) registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L) + private val timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) + + registerGauge("eventTime-watermark", +progress => convertStringDateToMillis(progress.eventTime.get("watermark")), 0L) + + registerGauge("states-rowsTotal", _.stateOperators.map(_.numRowsTotal).sum, 0L) + registerGauge("states-usedBytes", _.stateOperators.map(_.memoryUsedBytes).sum, 0L) + --- End diff -- We can add more metrics like "providerLoadedMapSizeBytes" after adopting SPARK-24441, so that actual memory usage of state store provider could be tracked via time-series manner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21693: [SPARK-24673][SQL] scala sql function from_utc_timestamp...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21693 @gatorsmile btw, on a personal note, I got a new (& first) baby in my family hours ago ;)) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21260: [SPARK-23529][K8s] Support mounting volumes
Github user liyinan926 commented on the issue: https://github.com/apache/spark/pull/21260 @felixcheung @mccheah Can you take a look and merge this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/716/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21684 **[Test build #92665 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92665/testReport)** for PR 21684 at commit [`b187b15`](https://github.com/apache/spark/commit/b187b15a58b01bc3f96accbfe9f92a48535c6df8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21693: [SPARK-24673][SQL] scala sql function from_utc_timestamp...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21693 Thanks, @maropu. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 @tedyu Thanks for the suggestion. Published the result to the mail thread. https://lists.apache.org/thread.html/323ab22fea87c14a2f92e58e7a810aa37cbdf00b9ab81448ee967976@%3Cdev.spark.apache.org%3E I've only written a short summary of the result (since mail may not be a good format to describe detailed numbers rather than markdown) and spend more time to explain the rationalization of my recent issues so that all of them are being covered together. I'll wait more a couple of days, and try to put detailed numbers if things are not started reviewing until then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/715/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21693: [SPARK-24673][SQL] scala sql function from_utc_timestamp...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21693 ok, I get time now, so I'll take it ;) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92664/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21684 **[Test build #92664 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92664/testReport)** for PR 21684 at commit [`a3a0d2f`](https://github.com/apache/spark/commit/a3a0d2fb661747ae1740295c2543e19976fd1045). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21684 **[Test build #92664 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92664/testReport)** for PR 21684 at commit [`a3a0d2f`](https://github.com/apache/spark/commit/a3a0d2fb661747ae1740295c2543e19976fd1045). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user rekhajoshm commented on the issue: https://github.com/apache/spark/pull/21684 ``` Discovery starting. Discovery completed in 5 seconds, 946 milliseconds. Run starting. Expected test count is: 16 StandaloneRestSubmitSuite: - construct submit request - create submission - create submission from main method - kill submission - request submission status - create then kill - create then request status - create then kill then request status - kill or request status before create - good request paths - good request paths, bad requests - bad request paths - server returns unknown fields - client handles faulty server - client does not send 'SPARK_ENV_LOADED' env var by default - client includes mesos env vars Run completed in 8 seconds, 480 milliseconds. Total number of tests run: 16 Suites: completed 2, aborted 0 Tests: succeeded 16, failed 0, canceled 0, ignored 0, pending 0 All tests passed. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21073 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92662/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21073 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21073 **[Test build #92662 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92662/testReport)** for PR 21073 at commit [`03328a4`](https://github.com/apache/spark/commit/03328a417ea04722c1497cf09583dff909afe979). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21709: [SPARK-5152][CORE] Read metrics config file from Hadoop ...
Github user jzhuge commented on the issue: https://github.com/apache/spark/pull/21709 Not quite there with SparkConf. We already uses option `--properties-file` for a different purpose and spark-submit does not allow multiple occurrences of `--properties-file`. How about the enhancement to allow multiple occurrences of `--properties-file`? That will solve this issue and serve other use cases as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21722: Spark-24742: Fix NullPointerexception in Field Metadata
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21722 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21722: Spark-24742: Fix NullPointerexception in Field Metadata
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21722 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21722: Spark-24742: Fix NullPointerexception in Field Metadata
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21722 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21722: Spark-24742: Fix NullPointerexception in Field Me...
GitHub user kupferk opened a pull request: https://github.com/apache/spark/pull/21722 Spark-24742: Fix NullPointerexception in Field Metadata ## What changes were proposed in this pull request? This pull request provides a fix for SPARK-24742: SQL Field MetaData was throwing an Exception in the hashCode method when a "null" Metadata was added via "putNull" ## How was this patch tested? A new unittest is provided in org/apache/spark/sql/types/MetadataSuite.scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/kupferk/spark SPARK-24742 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21722.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21722 commit 08ad908728f6def2c07368a5cdd89df00f32ba2d Author: Kaya Kupferschmidt Date: 2018-07-05T06:01:28Z SPARK-24742: Fixed NullPointerException in Metadata when storing null values commit 088e2d789dad707bd657a72afa8933e957641536 Author: Kaya Kupferschmidt Date: 2018-07-05T06:03:33Z SPARK-24742: Improved TestSuite for Metadata --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/714/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21684 **[Test build #92663 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92663/testReport)** for PR 21684 at commit [`bac8eef`](https://github.com/apache/spark/commit/bac8eefc935514ef2b1ea03364033251c2e624bf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21721 Though I haven't take a look yet, I would like to see this feature (mentioned from https://github.com/apache/spark/pull/21622#issuecomment-399677099) and happy to see this being implemented! While I love the feature, I agree with @jose-torres that it is going to be a new public API (part of Datasource V2) so worth to discuss regarding the API itself before having specific implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21684 **[Test build #92657 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92657/testReport)** for PR 21684 at commit [`e043901`](https://github.com/apache/spark/commit/e043901029a25b5e0d4276c3e4c2aa07ce13716a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r200544127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -47,15 +48,26 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = { +def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + +val sessionState = spark.sessionState +val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging") + if (catalogTable.partitionColumnNames.isEmpty) { - calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri) + calculateLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) - partitions.map { p => -calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri) - }.sum + val paths = partitions.map(x => new Path(x.storage.locationUri.get.getPath)) + val pathFilter = new PathFilter { +override def accept(path: Path): Boolean = { + !path.getName.startsWith(stagingDir) +} + } + val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths, +sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2) --- End diff -- The tests are passing but this line is incorrect. @gatorsmile @maropu, PathFilter is not serializable, How do we pass PathFilter in a serializable manner? I checked the code and one other way is to use `FileInputFormat.getInputPathFilter`/`FileInputFormat.setInputPathFilter` but I couldn't get it to work. Also, is it okay if we filter the `Seq[(Path, Seq[FileStatus])]` returned by `bulkListLeafFiles` and remove `stagingDir` files? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200541043 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200543679 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType --- End diff -- We have to consider `containsNull` for both children? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200541138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200540949 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200540913 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { --- End diff -- nit: `foreach { array =>`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r200541020 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3261,3 +3261,339 @@ case class ArrayDistinct(child: Expression) override def prettyName: String = "array_distinct" } + +object ArraySetLike { + def throwUnionLengthOverflowException(length: Int): Unit = { +throw new RuntimeException(s"Unsuccessful try to union arrays with $length " + + s"elements due to exceeding the array size limit " + + s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") + } +} + + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + @transient protected lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient protected lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } +} + +/** + * Returns an array of the elements in the union of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ +_FUNC_(array1, array2) - Returns an array of the elements in the union of array1 and array2, + without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 2, 3, 5) + """, + since = "2.4.0") +case class ArrayUnion(left: Expression, right: Expression) extends ArraySetLike { + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + resultArray.setInt(pos, elem) + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + resultArray.setLong(pos, elem) + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + size: Int, + resultArray: ArrayData, + isLongType: Boolean): ArrayData = { +// store elements into resultArray +var foundNullElement = false +var pos = 0 +Seq(array1, array2).foreach(array => { + var i = 0 + while (i < array.numElements()) { +if (array.isNullAt(i)) { + if (!foundNullElement) { +resultArray.setNullAt(pos) +pos += 1 +foundNullElement = true + } +} else { + val assigned = if (!isLongType) { +assignInt(array, i, resultArray, pos) + } else { +assignLong(array, i, resultArray, pos) + } + if (assigned) { +pos += 1 + } +} +i += 1 + } +}) +resultArray + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + val hsSize = new OpenHashSet[Int] + var nullElementSize = 0 + Seq(array1, array2).foreach(array => { +var i = 0 +while (i < array.numElements()) { + if (hsSize.size + nullElementSize > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { +ArraySetLike.throwUnionLengthOverflowException(hsSize.size) + } + if (array.isNullAt(i)) { +if (nullElementSize == 0) { + nullElementSize = 1 +} + } else { +hsSize.add(array.getInt(i)) + } + i += 1 +} +
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21073 **[Test build #92662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92662/testReport)** for PR 21073 at commit [`03328a4`](https://github.com/apache/spark/commit/03328a417ea04722c1497cf09583dff909afe979). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19222 ping @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21611 **[Test build #92661 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92661/testReport)** for PR 21611 at commit [`e67594a`](https://github.com/apache/spark/commit/e67594ab9caa2856bd0ba8cf9398ea9dd1244738). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21611 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21611 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/713/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user tedyu commented on the issue: https://github.com/apache/spark/pull/21700 Please publish the above results to the thread where you requested review from committers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21611: [SPARK-24569][SQL] Aggregator with output type Option sh...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21611 Based on the comment, this null check is for preventing encoding a Product type to a null top-level row. For `Option[Int]`, it is encoded to an int column in a top-level row. An example looks like: ```scala val df = Seq(Some(1), Some(2), null, None).toDS() df.show() ``` ``` +-+ |value| +-+ |1| |2| | null| | null| +-+ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21023: [SPARK-23949] makes && supports the function of predicat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21023 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21677: [SPARK-24692][TESTS] Improvement FilterPushdownBe...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21677 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21713: [SPARK-24737][SQL] Type coercion between StructTy...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21713 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21677: [SPARK-24692][TESTS] Improvement FilterPushdownBenchmark
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21677 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21677: [SPARK-24692][TESTS] Improvement FilterPushdownBenchmark
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21677 I am merging this to show up other benchmark results in @wangyum's PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21693: [SPARK-24673][SQL] scala sql function from_utc_timestamp...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21693 Will go ahead on this weekends. If you guys find some times please go ahead till then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21713: [SPARK-24737][SQL] Type coercion between StructTypes.
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21713 Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21690: [SPARK-24713]AppMatser of spark streaming kafka OOM if t...
Github user yuanboliu commented on the issue: https://github.com/apache/spark/pull/21690 @koeninger Thanks for your details. Sorry quite busy this week. I will delete the last pause, test the patch on my own cluster this weekend and give feedback asap. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21073: [SPARK-23936][SQL] Implement map_concat
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21073 **[Test build #92660 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92660/testReport)** for PR 21073 at commit [`3c0da03`](https://github.com/apache/spark/commit/3c0da039a0f11e0bc4f18342c96cf1dc100d2060). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200538008 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala --- @@ -95,4 +95,25 @@ private object JsonUtils { } Serialization.write(result) } + + /** + * Write per-topic partition lag as json string --- End diff -- Is "lag" here just the difference (at the time a batch ends) between the last offset Spark knows about and the last offset Spark has processed? I'm not sure this is super useful to know. If maxOffsets isn't set it's always going to be 0, no matter how far Spark gets behind the Kafka cluster. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537533 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -379,3 +384,16 @@ private[kafka010] case class KafkaMicroBatchInputPartitionReader( } } } + +// Currently reports per topic-partition lag. --- End diff -- nit: javadoc style for top level comments --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537276 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsCustomMetrics.java --- @@ -0,0 +1,30 @@ +/* --- End diff -- This should probably be in v2/reader/streaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21721#discussion_r200537454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -178,12 +180,18 @@ class SourceProgress protected[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } -("description" -> JString(description)) ~ +val jsonVal = ("description" -> JString(description)) ~ ("startOffset" -> tryParse(startOffset)) ~ ("endOffset" -> tryParse(endOffset)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + +if (customMetrics != null) { + jsonVal ~ ("customMetrics" -> tryParse(customMetrics.json())) --- End diff -- Is there any way to get an error to the user if their custom metrics fail to parse? I'm not entirely sure that's the right thing to do, but I worry that it'll be hard to develop against this API if we just silently drop malformed metrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21620#discussion_r200537707 --- Diff: sql/core/src/test/resources/sql-tests/results/typeCoercion/native/arrayJoin.sql.out --- @@ -0,0 +1,90 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 11 + + +-- !query 0 +SELECT array_join(array(true, false), ', ') +-- !query 0 schema +struct +-- !query 0 output +true, false + + +-- !query 1 +SELECT array_join(array(2Y, 1Y), ', ') +-- !query 1 schema +struct +-- !query 1 output +2, 1 + + +-- !query 2 +SELECT array_join(array(2S, 1S), ', ') +-- !query 2 schema +struct +-- !query 2 output +2, 1 + + +-- !query 3 +SELECT array_join(array(2, 1), ', ') +-- !query 3 schema +struct +-- !query 3 output +2, 1 + + +-- !query 4 +SELECT array_join(array(2L, 1L), ', ') +-- !query 4 schema +struct +-- !query 4 output +2, 1 + + +-- !query 5 +SELECT array_join(array(9223372036854775809, 9223372036854775808), ', ') +-- !query 5 schema +struct +-- !query 5 output +9223372036854775809, 9223372036854775808 + + +-- !query 6 +SELECT array_join(array(2.0D, 1.0D), ', ') +-- !query 6 schema +struct +-- !query 6 output +2.0, 1.0 + + +-- !query 7 +SELECT array_join(array(float(2.0), float(1.0)), ', ') +-- !query 7 schema +struct +-- !query 7 output +2.0, 1.0 + + +-- !query 8 +SELECT array_join(array(date '2016-03-14', date '2016-03-13'), ', ') +-- !query 8 schema +struct +-- !query 8 output +2016-03-14, 2016-03-13 + + +-- !query 9 +SELECT array_join(array(timestamp '2016-11-15 20:54:00.000', timestamp '2016-11-12 20:54:00.000'), ', ') +-- !query 9 schema +struct --- End diff -- Hm, yes, it will be. In general, if an expression has `children: Seq[Expression]` as its argument, the automatically generated column name will be long for now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user bjkonglu commented on the issue: https://github.com/apache/spark/pull/21721 I think that SS'metrics should be report to Spark UI. Then user can lean application operations! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21658: [SPARK-24678][Spark-Streaming] Give priority in u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21658#discussion_r200536325 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1569,7 +1569,7 @@ private[spark] object BlockManager { val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { - blockManagers(blockIds(i)) = blockLocations(i).map(_.host) + blockManagers(blockIds(i)) = blockLocations(i).map(b => s"executor_${b.host}_${b.executorId}") --- End diff -- Also you'd better using `ExecutorCacheTaskLocation#toString` here instead of manually writing the location hint, which will be more robust. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21710: [SPARK-24207][R]add R API for PrefixSpan
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21710 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21710: [SPARK-24207][R]add R API for PrefixSpan
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21710 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92658/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21710: [SPARK-24207][R]add R API for PrefixSpan
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21710 **[Test build #92658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92658/testReport)** for PR 21710 at commit [`d2e85f5`](https://github.com/apache/spark/commit/d2e85f5f30a89dbd12d5bcd4f4c5187aac956b87). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user bjkonglu commented on the issue: https://github.com/apache/spark/pull/21721 I think this proposal is necessary. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21666: [SPARK-24535][SPARKR] fix tests on java check error
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21666 Any update on this issue @felixcheung , this blocks 2.3.2 release, just want to make sure if it is still in-progress. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21658: [SPARK-24678][Spark-Streaming] Give priority in u...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21658#discussion_r200535022 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1569,7 +1569,7 @@ private[spark] object BlockManager { val blockManagers = new HashMap[BlockId, Seq[String]] for (i <- 0 until blockIds.length) { - blockManagers(blockIds(i)) = blockLocations(i).map(_.host) + blockManagers(blockIds(i)) = blockLocations(i).map(b => s"executor_${b.host}_${b.executorId}") --- End diff -- Yeah, it's OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21721 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92659/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21721 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21721 **[Test build #92659 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92659/testReport)** for PR 21721 at commit [`b7b2c3b`](https://github.com/apache/spark/commit/b7b2c3b1c9242fe205869f108548248f71ff8203). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21701: [SPARK-24730][SS] Add policy to choose max as global wat...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21701 @zsxwing @brkyvz --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21718: [SPARK-24744][STRUCTRURED STREAMING] Set the SparkSessio...
Github user bjkonglu commented on the issue: https://github.com/apache/spark/pull/21718 Thx @HeartSaVioR .I am looking forward to your solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21721 **[Test build #92659 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92659/testReport)** for PR 21721 at commit [`b7b2c3b`](https://github.com/apache/spark/commit/b7b2c3b1c9242fe205869f108548248f71ff8203). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21721 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21721 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21721: [SPARK-24748][SS] Support for reporting custom metrics v...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21721 @tdas @jose-torres @HeartSaVioR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21721: [SPARK-24748][SS] Support for reporting custom me...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/spark/pull/21721 [SPARK-24748][SS] Support for reporting custom metrics via StreamingQuery Progress ## What changes were proposed in this pull request? Currently the Structured Streaming sources and sinks does not have a way to report custom metrics. Providing an option to report custom metrics and making it available via Streaming Query progress can enable sources and sinks to report custom progress information (E.g. the lag metrics for Kafka source). Similar metrics can be reported for Sinks as well, but would like to get initial feedback before proceeding further. ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/spark SPARK-24748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21721 commit b7b2c3b1c9242fe205869f108548248f71ff8203 Author: Arun Mahadevan Date: 2018-07-06T01:51:50Z [SPARK-24748][SS] Support for reporting custom metrics via Streaming Query Progress --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21700 I would like to add numbers to pursuade how much this patch is helpful for end users of Apache Spark. I crafted and published a project which implements some stateful use cases with IoT Trucking example. https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming With running apps with I can see that cache (loadedMaps) in HDFSBackedStateStoreProvider consumes much more memory than one version of state. It's not like 10~30% but more than 1500% and even more than 8000% in specific case based on the update ratio of state. (Capturing overall map size of provider requires applying the patch #21469 ) Below table is the result of the query, publishing query status to Kafka topic and query these data via Spark SQL. https://gist.github.com/HeartSaVioR/9d53b39052d4779a4c77e71ff7e989a3 > Before applying the patch (`spark.sql.streaming.minBatchesToRetain` set to default value 100) * stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 319 | 765456 | 2632 | 185499903 | 3307747279 | 17.8315310439811928 * window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 142 | 184 | 138 | 72103 | 6214927 | 86.1951236425668835 * deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 634 | 598 | 0 | 136279 | 6587839 | 48.3408228707284321 > After applying this patch (`spark.sql.streaming.maxBatchesToRetainInMemory` set to default value 2) * stream-stream join (IotTruckingAppJoinedAbnormalEvents.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 127 | 298452 | 4170 | 71023679 | 84454399 | 1.1891020035726395 * window aggregation (IotTruckingAppMovingAggregationsOnSpeed.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 132 | 184 | 138 | 72319 | 162647 | 2.2490216955433565 * deduplication (IotTruckingAppDistinctPairDriverAndTruck.scala) batchId | numRowsTotal | numRowsUpdated | memoryUsedBytes | providerLoadedMapSize | stateExcessLoadingOverheadPercentage -- | -- | -- | -- | -- | -- 133 | 598 | 0 | 136079 | 227863 | 1.6744905532815497 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21608 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92655/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21608 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21608 **[Test build #92655 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92655/testReport)** for PR 21608 at commit [`1e1db66`](https://github.com/apache/spark/commit/1e1db660345b0d9f94b59301d38a84bc506a791c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21710: [SPARK-24207][R]add R API for PrefixSpan
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21710 **[Test build #92658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92658/testReport)** for PR 21710 at commit [`d2e85f5`](https://github.com/apache/spark/commit/d2e85f5f30a89dbd12d5bcd4f4c5187aac956b87). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21710: [SPARK-24207][R]add R API for PrefixSpan
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21710 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/712/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21710: [SPARK-24207][R]add R API for PrefixSpan
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21710 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21720: [SPARK-24163][SPARK-24164][SQL] Support column list as t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21720 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21720: [SPARK-24163][SPARK-24164][SQL] Support column list as t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21720 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92656/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21720: [SPARK-24163][SPARK-24164][SQL] Support column list as t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21720 **[Test build #92656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92656/testReport)** for PR 21720 at commit [`942a30d`](https://github.com/apache/spark/commit/942a30dfc0fd070c067aa8d157075909610d3aaa). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...
Github user arunmahadevan commented on the issue: https://github.com/apache/spark/pull/21673 @tdas , thanks for your comments. Yes theres problem with the current abstraction, and I didn't consider refactoring it since there have been multiple changes to this class without changing the underlying structure and the fields of the ExecutionStats are accessed from multiple places within StreamExecution already. I did not think adding an extra field would increase the code complexity, however if you plan to do major refactoring to simplify the logic and address the issues, I am happy to discard this PR and help review your changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21556: [SPARK-24549][SQL] Support Decimal type push down...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21556#discussion_r200526464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -82,6 +120,30 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) + +case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal => --- End diff -- I see. I will do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21684 **[Test build #92657 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92657/testReport)** for PR 21684 at commit [`e043901`](https://github.com/apache/spark/commit/e043901029a25b5e0d4276c3e4c2aa07ce13716a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/711/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21684: [SPARK-24470][Core] RestSubmissionClient to be robust ag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21684 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21720: [SPARK-24163][SPARK-24164][SQL] Support column li...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21720#discussion_r200518399 --- Diff: sql/core/src/test/resources/sql-tests/inputs/pivot.sql --- @@ -88,12 +93,12 @@ PIVOT ( ); -- pivot with aliases and projection -SELECT 2012_s, 2013_s, 2012_a, 2013_a, c FROM ( +SELECT firstYear_s, secondYear_s, firstYear_a, secondYear_a, c FROM ( SELECT year y, course c, earnings e FROM courseSales ) PIVOT ( sum(e) s, avg(e) a - FOR y IN (2012, 2013) + FOR y IN (2012 as firstYear, 2013 secondYear) --- End diff -- can we keep the original query? add a new one for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21541: [SPARK-20168][Streaming Kinesis] Setting the timestamp d...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21541 **[Test build #4209 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4209/testReport)** for PR 21541 at commit [`133ba87`](https://github.com/apache/spark/commit/133ba8721319efa9bfa64fbb53a690b5140c957b). * This patch **fails Java style tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21541: [SPARK-20168][Streaming Kinesis] Setting the timestamp d...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21541 **[Test build #4209 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4209/testReport)** for PR 21541 at commit [`133ba87`](https://github.com/apache/spark/commit/133ba8721319efa9bfa64fbb53a690b5140c957b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21652: [SPARK-24551][K8S] Add integration tests for secr...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21652#discussion_r200515051 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala --- @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{Pod, Secret, SecretBuilder} +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io.output.ByteArrayOutputStream +import org.scalatest.concurrent.Eventually + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ + +private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite => + + import SecretsTestsSuite._ + + private def createTestSecret(): Secret = { +val sb = new SecretBuilder() +sb.withNewMetadata() + .withName(ENV_SECRET_NAME) + .endMetadata() +val secUsername = Base64.encodeBase64String(ENV_SECRET_VALUE_1.getBytes()) +val secPassword = Base64.encodeBase64String(ENV_SECRET_VALUE_2.getBytes()) +val envSecretData = Map(ENV_SECRET_KEY_1 -> secUsername, ENV_SECRET_KEY_2 -> secPassword) +sb.addToData(envSecretData.asJava) +val envSecret = sb.build() +val sec = kubernetesTestComponents + .kubernetesClient + .secrets() + .createOrReplace(envSecret) +sec --- End diff -- `sec` is redundant? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21652: [SPARK-24551][K8S] Add integration tests for secr...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21652#discussion_r200514990 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala --- @@ -31,19 +31,18 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.deploy.k8s.integrationtest.config._ -import org.apache.spark.launcher.SparkLauncher private[spark] class KubernetesSuite extends SparkFunSuite - with BeforeAndAfterAll with BeforeAndAfter { + with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite { import KubernetesSuite._ private var testBackend: IntegrationTestBackend = _ private var sparkHomeDir: Path = _ - private var kubernetesTestComponents: KubernetesTestComponents = _ - private var sparkAppConf: SparkAppConf = _ + protected var kubernetesTestComponents: KubernetesTestComponents = _ --- End diff -- Do you really need protected vs `private[k8s]` or something? I don't know enough to have much of an opinion though. The `protected` just looked a little unusual for member variables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21652: [SPARK-24551][K8S] Add integration tests for secr...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21652#discussion_r200514722 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import io.fabric8.kubernetes.api.model.Pod + +private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => + + test("Run SparkPi with no resources.") { +runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a very long application name.") { +sparkAppConf.set("spark.app.name", "long" * 40) +runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a master URL without a scheme.") { +val url = kubernetesTestComponents.kubernetesClient.getMasterUrl +val k8sMasterUrl = if (url.getPort < 0) { + s"k8s://${url.getHost}" +} else { + s"k8s://${url.getHost}:${url.getPort}" +} +sparkAppConf.set("spark.master", k8sMasterUrl) +runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with an argument.") { +runSparkPiAndVerifyCompletion(appArgs = Array("5")) + } + + test("Run SparkPi with custom labels, annotations, and environment variables.") { +sparkAppConf + .set("spark.kubernetes.driver.label.label1", "label1-value") + .set("spark.kubernetes.driver.label.label2", "label2-value") + .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") + .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") + .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") + .set("spark.kubernetes.executor.label.label1", "label1-value") + .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") + .set("spark.executorEnv.ENV1", "VALUE1") + .set("spark.executorEnv.ENV2", "VALUE2") + +runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { +doBasicDriverPodCheck(driverPod) +checkCustomSettings(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { +doBasicExecutorPodCheck(executorPod) +checkCustomSettings(executorPod) + }) + } + + // TODO(ssuchter): Enable the below after debugging + // test("Run PageRank using remote data file") { --- End diff -- You can also declare this test with `ignore` rather than `test`. It would make sure it's at least compiled, if that's desirable, but not run. Doesn't matter much. I'd block-comment this much code to disable it though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21652: [SPARK-24551][K8S] Add integration tests for secr...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21652#discussion_r200514767 --- Diff: resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala --- @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import io.fabric8.kubernetes.api.model.Pod + +private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => + + test("Run SparkPi with no resources.") { +runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a very long application name.") { +sparkAppConf.set("spark.app.name", "long" * 40) +runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a master URL without a scheme.") { +val url = kubernetesTestComponents.kubernetesClient.getMasterUrl +val k8sMasterUrl = if (url.getPort < 0) { + s"k8s://${url.getHost}" +} else { + s"k8s://${url.getHost}:${url.getPort}" +} +sparkAppConf.set("spark.master", k8sMasterUrl) +runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with an argument.") { +runSparkPiAndVerifyCompletion(appArgs = Array("5")) + } + + test("Run SparkPi with custom labels, annotations, and environment variables.") { +sparkAppConf + .set("spark.kubernetes.driver.label.label1", "label1-value") + .set("spark.kubernetes.driver.label.label2", "label2-value") + .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") + .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") + .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") + .set("spark.kubernetes.executor.label.label1", "label1-value") + .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") + .set("spark.executorEnv.ENV1", "VALUE1") + .set("spark.executorEnv.ENV2", "VALUE2") + +runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { +doBasicDriverPodCheck(driverPod) +checkCustomSettings(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { +doBasicExecutorPodCheck(executorPod) +checkCustomSettings(executorPod) + }) + } + + // TODO(ssuchter): Enable the below after debugging + // test("Run PageRank using remote data file") { + // sparkAppConf + // .set("spark.kubernetes.mountDependencies.filesDownloadDir", + // CONTAINER_LOCAL_FILE_DOWNLOAD_PATH) + // .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) + // runSparkPageRankAndVerifyCompletion( + // appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE)) + // } + +// private def runSparkPageRankAndVerifyCompletion( +// appResource: String = containerLocalSparkDistroExamplesJar, +// driverPodChecker: Pod => Unit = doBasicDriverPodCheck, +// executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, +// appArgs: Array[String], +// appLocator: String = appLocator): Unit = { +//runSparkApplicationAndVerifyCompletion( +// appResource, +// SPARK_PAGE_RANK_MAIN_CLASS, +// Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"), +// appArgs, +// driverPodChecker, +// executorPodChecker, +// appLocator) +// } +} + +private[spark] object BasicTestsSuite { + val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank" + // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files" + + // val REMOTE_PAGE_RANK_DATA_FILE
[GitHub] spark issue #21673: [SPARK-24697][SS] Fix the reported start offsets in stre...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21673 Thanks @arunmahadevan for making this PR. However, I dont like the solution of adding another field as a workaround thus making the control flow harder to reason about. I think the fundamental problem is that the original design of the ProgressReport that sees all the internal details of StreamExecution (e.g. availableOffsets and committedOffsets) and its very reason what informatio is read when. I want to refactor this a little bit towards improving this underlying problem. I am working on a PR myself for that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21620#discussion_r200514583 --- Diff: sql/core/src/test/resources/sql-tests/results/typeCoercion/native/arrayJoin.sql.out --- @@ -0,0 +1,90 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 11 + + +-- !query 0 +SELECT array_join(array(true, false), ', ') +-- !query 0 schema +struct +-- !query 0 output +true, false + + +-- !query 1 +SELECT array_join(array(2Y, 1Y), ', ') +-- !query 1 schema +struct +-- !query 1 output +2, 1 + + +-- !query 2 +SELECT array_join(array(2S, 1S), ', ') +-- !query 2 schema +struct +-- !query 2 output +2, 1 + + +-- !query 3 +SELECT array_join(array(2, 1), ', ') +-- !query 3 schema +struct +-- !query 3 output +2, 1 + + +-- !query 4 +SELECT array_join(array(2L, 1L), ', ') +-- !query 4 schema +struct +-- !query 4 output +2, 1 + + +-- !query 5 +SELECT array_join(array(9223372036854775809, 9223372036854775808), ', ') +-- !query 5 schema +struct +-- !query 5 output +9223372036854775809, 9223372036854775808 + + +-- !query 6 +SELECT array_join(array(2.0D, 1.0D), ', ') +-- !query 6 schema +struct +-- !query 6 output +2.0, 1.0 + + +-- !query 7 +SELECT array_join(array(float(2.0), float(1.0)), ', ') +-- !query 7 schema +struct +-- !query 7 output +2.0, 1.0 + + +-- !query 8 +SELECT array_join(array(date '2016-03-14', date '2016-03-13'), ', ') +-- !query 8 schema +struct +-- !query 8 output +2016-03-14, 2016-03-13 + + +-- !query 9 +SELECT array_join(array(timestamp '2016-11-15 20:54:00.000', timestamp '2016-11-12 20:54:00.000'), ', ') +-- !query 9 schema +struct --- End diff -- If the input array is very long, the automatically generated column name will be also super long? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org