[GitHub] spark pull request #20931: [SPARK-23815][Core]Spark writer dynamic partition...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20931 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20931: [SPARK-23815][Core]Spark writer dynamic partition overwr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20931 thanks, merging to master/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21052: [SPARK-23799] FilterEstimation.evaluateInSet produces de...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21052 **[Test build #89316 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89316/testReport)** for PR 21052 at commit [`74b6ebd`](https://github.com/apache/spark/commit/74b6ebdc2cd8a91944cc6159946f560ba7212a6a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181289458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { --- End diff -- let's just inline it. People can still create a new index in the future, technically this can't prevent users from doing that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21024 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21024 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89311/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21052: [SPARK-23799] FilterEstimation.evaluateInSet produces de...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21052 cc @wzhfy Please review this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21058: [SPARK-23971] Should not leak Spark sessions acro...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21058 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21004 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2297/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21024 **[Test build #89311 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89311/testReport)** for PR 21024 at commit [`e739a0a`](https://github.com/apache/spark/commit/e739a0a247bc3782ee4348246eff921c86f83e13). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21052: [SPARK-23799] FilterEstimation.evaluateInSet produces de...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21052 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21004 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21004 **[Test build #89315 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89315/testReport)** for PR 21004 at commit [`12ac191`](https://github.com/apache/spark/commit/12ac191cb29f4ba1f817abffc8c7422efe837b38). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21043: [SPARK-23963] [SQL] Properly handle large number of colu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21043 Could you show some perf number in the PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181288716 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -146,6 +152,10 @@ final class OneVsRestModel private[ml] ( @Since("2.1.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group setParam */ + @Since("2.4.0") + def setRawPredictionCol(value: String): this.type = set(rawPredictionCol, value) --- End diff -- You'll need to add this to the Estimator too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181288736 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -195,15 +206,32 @@ final class OneVsRestModel private[ml] ( newDataset.unpersist() } -// output the index of the classifier with highest confidence as prediction -val labelUDF = udf { (predictions: Map[Int, Double]) => - predictions.maxBy(_._2)._1.toDouble -} +// output the RawPrediction as vector +if (getRawPredictionCol != "") { + val rawPredictionUDF = udf { (predictions: Map[Int, Double]) => +val predArray = Array.fill[Double](numClasses)(0.0) +predictions.foreach { case (idx, value) => predArray(idx) = value } +Vectors.dense(predArray) + } + + // output the index of the classifier with highest confidence as prediction + val labelUDF = udf { (predictions: Vector) => predictions.argmax.toDouble } -// output label and label metadata as prediction -aggregatedDataset - .withColumn($(predictionCol), labelUDF(col(accColName)), labelMetadata) - .drop(accColName) + aggregatedDataset +.withColumn(getRawPredictionCol, rawPredictionUDF(col(accColName))) +.withColumn(getPredictionCol, labelUDF(col(getRawPredictionCol)), labelMetadata) +.drop(accColName) +} +else { + // output the index of the classifier with highest confidence as prediction + val labelUDF = udf { (predictions: Map[Int, Double]) => +predictions.maxBy(_._2)._1.toDouble + } + // output confidence as rwa prediction, label and label metadata as prediction --- End diff -- This comment seems to be in the wrong part of the code. Also there's a typo --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181288725 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -195,15 +206,32 @@ final class OneVsRestModel private[ml] ( newDataset.unpersist() } -// output the index of the classifier with highest confidence as prediction -val labelUDF = udf { (predictions: Map[Int, Double]) => - predictions.maxBy(_._2)._1.toDouble -} +// output the RawPrediction as vector +if (getRawPredictionCol != "") { + val rawPredictionUDF = udf { (predictions: Map[Int, Double]) => +val predArray = Array.fill[Double](numClasses)(0.0) +predictions.foreach { case (idx, value) => predArray(idx) = value } +Vectors.dense(predArray) + } + + // output the index of the classifier with highest confidence as prediction + val labelUDF = udf { (predictions: Vector) => predictions.argmax.toDouble } -// output label and label metadata as prediction -aggregatedDataset - .withColumn($(predictionCol), labelUDF(col(accColName)), labelMetadata) - .drop(accColName) + aggregatedDataset +.withColumn(getRawPredictionCol, rawPredictionUDF(col(accColName))) +.withColumn(getPredictionCol, labelUDF(col(getRawPredictionCol)), labelMetadata) +.drop(accColName) +} +else { --- End diff -- Scala style: This should go on the previous line: ```} else {``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21058: [SPARK-23971] Should not leak Spark sessions across test...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21058 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181288721 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -195,15 +206,32 @@ final class OneVsRestModel private[ml] ( newDataset.unpersist() } -// output the index of the classifier with highest confidence as prediction -val labelUDF = udf { (predictions: Map[Int, Double]) => - predictions.maxBy(_._2)._1.toDouble -} +// output the RawPrediction as vector +if (getRawPredictionCol != "") { + val rawPredictionUDF = udf { (predictions: Map[Int, Double]) => +val predArray = Array.fill[Double](numClasses)(0.0) --- End diff -- This causes a subtle ContextCleaner bug: `numClasses` refers to a field of the class OneVsRestModel, so when Spark's closure capture serializes this UDF to send to executors, it will end up sending the entire OneVsRestModel object, rather than just the value for numClasses. Make a local copy of the value numClasses within the transform() method to avoid this issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181288710 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -138,6 +138,12 @@ final class OneVsRestModel private[ml] ( @Since("1.4.0") val models: Array[_ <: ClassificationModel[_, _]]) extends Model[OneVsRestModel] with OneVsRestParams with MLWritable { --- End diff -- Let's add a require() statement here which checks that models.nonEmpty is true (to throw an exception upon construction, rather than when numFeatures calls models.head below). Just to be safe... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181286908 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -195,15 +206,32 @@ final class OneVsRestModel private[ml] ( newDataset.unpersist() } -// output the index of the classifier with highest confidence as prediction -val labelUDF = udf { (predictions: Map[Int, Double]) => - predictions.maxBy(_._2)._1.toDouble -} +// output the RawPrediction as vector +if (getRawPredictionCol != "") { + val rawPredictionUDF = udf { (predictions: Map[Int, Double]) => +val predArray = Array.fill[Double](numClasses)(0.0) +predictions.foreach { case (idx, value) => predArray(idx) = value } +Vectors.dense(predArray) + } + + // output the index of the classifier with highest confidence as prediction + val labelUDF = udf { (predictions: Vector) => predictions.argmax.toDouble } -// output label and label metadata as prediction -aggregatedDataset - .withColumn($(predictionCol), labelUDF(col(accColName)), labelMetadata) - .drop(accColName) + aggregatedDataset +.withColumn(getRawPredictionCol, rawPredictionUDF(col(accColName))) +.withColumn(getPredictionCol, labelUDF(col(getRawPredictionCol)), labelMetadata) +.drop(accColName) +} +else { + // output the index of the classifier with highest confidence as prediction + val labelUDF = udf { (predictions: Map[Int, Double]) => +predictions.maxBy(_._2)._1.toDouble + } + // output confidence as rwa prediction, label and label metadata as prediction --- End diff -- rwa -> raw --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, a...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/21044#discussion_r181287383 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala --- @@ -195,15 +206,32 @@ final class OneVsRestModel private[ml] ( newDataset.unpersist() } -// output the index of the classifier with highest confidence as prediction -val labelUDF = udf { (predictions: Map[Int, Double]) => - predictions.maxBy(_._2)._1.toDouble -} +// output the RawPrediction as vector +if (getRawPredictionCol != "") { + val rawPredictionUDF = udf { (predictions: Map[Int, Double]) => +val predArray = Array.fill[Double](numClasses)(0.0) +predictions.foreach { case (idx, value) => predArray(idx) = value } +Vectors.dense(predArray) + } + + // output the index of the classifier with highest confidence as prediction + val labelUDF = udf { (predictions: Vector) => predictions.argmax.toDouble } --- End diff -- ==> `udf { (rawPredictions: Vector) => ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21037: [SPARK-23919][SQL] Add array_position function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21037 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21037: [SPARK-23919][SQL] Add array_position function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21037 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2296/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21053: [SPARK-23924][SQL] Add element_at function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21053 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181283769 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -552,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { --- End diff -- No, we can't. In some case we need to check the glob files, while we don't need to create `InMemoryFileIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21053: [SPARK-23924][SQL] Add element_at function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21053 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2295/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181283665 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { --- End diff -- I moved it here on purpose. So it may be avoid being created twice in the future. I am OK to inline it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21037: [SPARK-23919][SQL] Add array_position function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21037 **[Test build #89314 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89314/testReport)** for PR 21037 at commit [`16ae59c`](https://github.com/apache/spark/commit/16ae59cf02da2cf0cd2e9a311b348bd82b452bff). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21037: [SPARK-23919][SQL] Add array_position function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21037#discussion_r181283242 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +287,61 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * A function that returns the position of the first occurrence of element in the given array + * as long. Returns 0 if substr could not be found in str. + * Returns null if either of the arguments are null and --- End diff -- Good catch, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21053: [SPARK-23924][SQL] Add element_at function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21053 **[Test build #89313 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89313/testReport)** for PR 21053 at commit [`bb0ab45`](https://github.com/apache/spark/commit/bb0ab45b4a9bbf1155dbb9513508bbef3685b3f6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21048 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89310/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21048 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21048 **[Test build #89310 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89310/testReport)** for PR 21048 at commit [`ef05009`](https://github.com/apache/spark/commit/ef05009e491d1ffdca2a37ba0441ea8507756e3d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21058: [SPARK-23971] Should not leak Spark sessions across test...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21058 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21058: [SPARK-23971] Should not leak Spark sessions across test...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21058 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89307/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21058: [SPARK-23971] Should not leak Spark sessions across test...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21058 **[Test build #89307 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89307/testReport)** for PR 21058 at commit [`92afcc2`](https://github.com/apache/spark/commit/92afcc2f7a5dfb2bc5aa94e009ef1787f42a83ab). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21060 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21060 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2294/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21060 **[Test build #89312 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89312/testReport)** for PR 21060 at commit [`4656724`](https://github.com/apache/spark/commit/4656724d27c208d794f99691cfbf93b4bb118d93). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes coll...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21060#discussion_r181279443 --- Diff: python/pyspark/sql/tests.py --- @@ -185,22 +185,12 @@ def __init__(self, key, value): self.value = value -class ReusedSQLTestCase(ReusedPySparkTestCase): -@classmethod -def setUpClass(cls): -ReusedPySparkTestCase.setUpClass() -cls.spark = SparkSession(cls.sc) - -@classmethod -def tearDownClass(cls): -ReusedPySparkTestCase.tearDownClass() -cls.spark.stop() - -def assertPandasEqual(self, expected, result): --- End diff -- This method causes a conflict which I don't really understand why. I compared line by line, character by character and they look identical. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes coll...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/21060 [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in PySpark as action for a query executor listener ## What changes were proposed in this pull request? This PR proposes to add `collect` to a query executor as an action. Seems `collect` / `collect` with Arrow are not recognised via `QueryExecutionListener` as an action. For example, if we have a custom listener as below: ```scala package org.apache.spark.sql import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener class TestQueryExecutionListener extends QueryExecutionListener with Logging { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { logError("Look at me! I'm 'onSuccess'") } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { } } ``` and set `spark.sql.queryExecutionListeners` to `org.apache.spark.sql.TestQueryExecutionListener` Other operations in PySpark or Scala side seems fine: ```python >>> sql("SELECT * FROM range(1)").show() ``` ``` 18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess' +---+ | id| +---+ | 0| +---+ ``` ```scala scala> sql("SELECT * FROM range(1)").collect() ``` ``` 18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess' res1: Array[org.apache.spark.sql.Row] = Array([0]) ``` but .. **Before** ```python >>> sql("SELECT * FROM range(1)").collect() ``` ``` [Row(id=0)] ``` ```python >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> sql("SELECT * FROM range(1)").toPandas() ``` ``` id 0 0 ``` **After** ```python >>> sql("SELECT * FROM range(1)").collect() ``` ``` 18/04/09 16:57:58 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess' [Row(id=0)] ``` ```python >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> sql("SELECT * FROM range(1)").toPandas() ``` ``` 18/04/09 17:53:26 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess' id 0 0 ``` ## How was this patch tested? I have manually tested as described above and unit test was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark PR_TOOL_PICK_PR_21007_BRANCH-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21060.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21060 commit 4656724d27c208d794f99691cfbf93b4bb118d93 Author: hyukjinkwonDate: 2018-04-13T03:28:13Z [SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a query executor listener This PR proposes to add `collect` to a query executor as an action. Seems `collect` / `collect` with Arrow are not recognised via `QueryExecutionListener` as an action. For example, if we have a custom listener as below: ```scala package org.apache.spark.sql import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener class TestQueryExecutionListener extends QueryExecutionListener with Logging { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { logError("Look at me! I'm 'onSuccess'") } override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { } } ``` and set `spark.sql.queryExecutionListeners` to `org.apache.spark.sql.TestQueryExecutionListener` Other operations in PySpark or Scala side seems fine: ```python >>> sql("SELECT * FROM range(1)").show() ``` ``` 18/04/09 17:02:04 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess' +---+ | id| +---+ | 0| +---+ ``` ```scala scala> sql("SELECT * FROM range(1)").collect() ``` ``` 18/04/09 16:58:41 ERROR TestQueryExecutionListener: Look at me! I'm 'onSuccess' res1: Array[org.apache.spark.sql.Row] = Array([0]) ``` but .. **Before** ```python >>> sql("SELECT * FROM range(1)").collect() ``` ``` [Row(id=0)] ``` ```python >>> spark.conf.set("spark.sql.execution.arrow.enabled", "true") >>> sql("SELECT * FROM range(1)").toPandas() ```
[GitHub] spark issue #21060: [SPARK-23942][PYTHON][SQL][BRANCH-2.3] Makes collect in ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21060 cc @BryanCutler --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21007: [SPARK-23942][PYTHON][SQL] Makes collect in PySpa...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21007 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21007: [SPARK-23942][PYTHON][SQL] Makes collect in PySpark as a...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21007 Merged to master. Thanks for reviewing this @felixcheung, @viirya and @BryanCutler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21025: [SPARK-23918][SQL] Add array_min function
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21025 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21024 LGTM pending Jenkins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21017: [SPARK-23748][SS] Fix SS continuous process doesn...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21017 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21059: [SPARK-23974][CORE] fix when numExecutorsTarget equals m...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21059 cc @jiangxb1987 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21059: [SPARK-23974][CORE] fix when numExecutorsTarget equals m...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21059 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21059: [SPARK-23974][CORE] fix when numExecutorsTarget equals m...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21059 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21059: fix when numExecutorsTarget equals maxNumExecutor...
GitHub user sadhen opened a pull request: https://github.com/apache/spark/pull/21059 fix when numExecutorsTarget equals maxNumExecutors ## What changes were proposed in this pull request? In dynamic allocation, there are cases that the `numExecutorsTarget` has reached `maxNumExecutors`, but for some reason (client.requestTotalExecutors didn't work as expected or throw exceptions due to RPC failure), the method `addExecutors` always returns 0 without do `client.requestTotalExecutors`. And there are too many tasks to handle, `maxNeeded < numExecutorsTarget` is false, in `updateAndSyncNumExecutorsTarget` we always run into `addExecutors` with `numExecutorsTarget == maxNumExecutors`. Since numExecutorsTarget is hard to decrease, as a result, we are using only a few executors to handle the heavy tasks without dynamically increase the number of executors. Online logs: ``` $ grep "Not adding executors because our current target total" spark-job-server.log.9 | tail [2018-04-12 16:07:19,070] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:20,071] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:21,072] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:22,073] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:23,074] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:24,075] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:25,076] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:26,077] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:27,078] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 16:07:28,079] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) $ grep "Not adding executors because our current target total" spark-job-server.log.9 | head [2018-04-12 13:52:18,067] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:19,071] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:20,072] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:21,073] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:22,074] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:23,075] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:24,076] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:25,077] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:26,078] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) [2018-04-12 13:52:27,079] DEBUG .ExecutorAllocationManager [] [akka://JobServer/user/jobManager] - Not adding executors because our current target total is already 600 (limit 600) $ grep "Not adding executors because our current target total" spark-job-server.log.9 | wc -l 8111 ``` The logs mean that we
[GitHub] spark issue #21017: [SPARK-23748][SS] Fix SS continuous process doesn't supp...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21017 LGTM. Merging this to master and 2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21024 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21024 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2293/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21024 **[Test build #89311 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89311/testReport)** for PR 21024 at commit [`e739a0a`](https://github.com/apache/spark/commit/e739a0a247bc3782ee4348246eff921c86f83e13). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21024: [SPARK-23917][SQL] Add array_max function
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21024 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20828 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20828 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89305/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20828: [SPARK-23687][SS] Add a memory source for continuous pro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20828 **[Test build #89305 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89305/testReport)** for PR 20828 at commit [`55e37a9`](https://github.com/apache/spark/commit/55e37a95069a66df668afb57ed009b7c64ff3543). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` logInfo(s\"Failed to load main class $childMainClass.\")` * ` error(s\"Cannot load main class from JAR $primaryResource\")` * ` error(\"No main class set in JAR; please specify one with --class\")` * ` checkArgument(mainClass != null, \"Missing example class name.\");` * ` instr.logWarning(s\"All labels belong to a single class and fitIntercept=false. It's a \" +` * `sealed trait Node extends Serializable ` * `sealed trait ClassificationNode extends Node ` * `sealed trait RegressionNode extends Node ` * `sealed trait LeafNode extends Node ` * `sealed trait InternalNode extends Node ` * `class _StringIndexerParams(JavaParams, HasHandleInvalid, HasInputCol, HasOutputCol):` * `class StringIndexer(JavaEstimator, _StringIndexerParams, JavaMLReadable, JavaMLWritable):` * `class StringIndexerModel(JavaModel, _StringIndexerParams, JavaMLReadable, JavaMLWritable):` * `class VectorAssembler(JavaTransformer, HasInputCols, HasOutputCol, HasHandleInvalid, JavaMLReadable,` * `class KolmogorovSmirnovTest(object):` * `case class ExprCode(var code: String, var isNull: ExprValue, var value: ExprValue)` * `case class SubExprEliminationState(isNull: ExprValue, value: ExprValue)` * ` |class SpecificUnsafeProjection extends $` * `trait JavaCode ` * `trait ExprValue extends JavaCode ` * `case class SimpleExprValue(expr: String, javaType: Class[_]) extends ExprValue ` * `case class VariableValue(variableName: String, javaType: Class[_]) extends ExprValue ` * `case class GlobalValue(value: String, javaType: Class[_]) extends ExprValue ` * `class LiteralValue(val value: String, val javaType: Class[_]) extends ExprValue with Serializable ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r181273485 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- once it's materialized, it's still materialized after copy --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21018: [SPARK-23880][SQL] Do not trigger any jobs for ca...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21018#discussion_r181273417 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -119,26 +119,60 @@ class CacheManager extends Logging { while (it.hasNext) { val cd = it.next() if (cd.plan.find(_.sameResult(plan)).isDefined) { -cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) +cd.cachedRepresentation.clearCache(blocking) it.remove() } } } + /** + * Materialize the cache that refers to the given physical plan. --- End diff -- something like ``` class InMemoryRelation(private var _cachedColumnBuffers: RDD[CachedBatch] = null) { def cachedColumnBuffers = { if (_cachedColumnBuffers == null) { synchronized { if (_cachedColumnBuffers == null) { _cachedColumnBuffers = buildBuffer() } } } _cachedColumnBuffers } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Opt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20560#discussion_r181272920 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -733,6 +735,17 @@ object EliminateSorts extends Rule[LogicalPlan] { } } +/** + * Removes Sort operation if the child is already sorted + */ +object RemoveRedundantSorts extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +case Sort(orders, true, child) if child.outputOrdering.nonEmpty --- End diff -- `child.outputOrdering.nonEmpty` looks like unnecessary --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19868: [SPARK-22676] Avoid iterating all partition paths...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19868#discussion_r181272430 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala --- @@ -176,12 +176,13 @@ class HadoopTableReader( val matches = fs.globStatus(pathPattern) matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } -// convert /demo/data/year/month/day to /demo/data/*/*/*/ +// convert /demo/data/year/month/day to /demo/data/year/month/*/ --- End diff -- Actually do we still need this check? now we have something like `spark.sql.files.ignoreMissingFiles` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21007: [SPARK-23942][PYTHON][SQL] Makes collect in PySpark as a...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21007 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21007: [SPARK-23942][PYTHON][SQL] Makes collect in PySpark as a...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21007 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89304/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21007: [SPARK-23942][PYTHON][SQL] Makes collect in PySpark as a...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21007 **[Test build #89304 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89304/testReport)** for PR 21007 at commit [`7c1b3c6`](https://github.com/apache/spark/commit/7c1b3c606d9b90dbffbaa7d7442bbd609940b98f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20904: [SPARK-23751][ML][PySpark] Kolmogorov-Smirnoff te...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/20904#discussion_r181270142 --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala --- @@ -81,32 +81,37 @@ object KolmogorovSmirnovTest { * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, cdf: Double => Double)` */ @Since("2.4.0") - def test(dataset: DataFrame, sampleCol: String, -cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = { -val f: Double => Double = x => cdf.call(x) -test(dataset, sampleCol, f) + def test( + dataset: Dataset[_], + sampleCol: String, + cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = { +test(dataset, sampleCol, (x: Double) => cdf.call(x)) --- End diff -- I can build locally against scala-2.12 to check it. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21004 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89306/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21004 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIndex
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21004 **[Test build #89306 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89306/testReport)** for PR 21004 at commit [`60d5b6b`](https://github.com/apache/spark/commit/60d5b6b6f6f99166f31740dda93dfc0be43e0d41). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269578 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -552,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { --- End diff -- and we can merge `checkAndGlobPathIfNecessary` and `createInMemoryFileIndex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -552,6 +523,40 @@ case class DataSource( sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") } } + + /** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ + private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { --- End diff -- this can be `def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -384,24 +356,23 @@ case class DataSource( // This is a non-streaming file based datasource. case (format: FileFormat, _) => -val allPaths = caseInsensitiveOptions.get("path") ++ paths -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val globbedPaths = allPaths.flatMap( - DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray - -val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) - -val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && -catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { +val globbedPaths = + checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) +val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && + catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && + catalogTable.get.partitionSchema.nonEmpty --- End diff -- use `partitionColumnNames` over `partitionSchema`, since `partitionColumnNames` is a val and `partitionSchema` is def --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21004: [SPARK-23896][SQL]Improve PartitioningAwareFileIn...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21004#discussion_r181269206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -95,6 +95,14 @@ case class DataSource( lazy val sourceInfo: SourceInfo = sourceSchema() private val caseInsensitiveOptions = CaseInsensitiveMap(options) private val equality = sparkSession.sessionState.conf.resolver + // The operations below are expensive therefore try not to do them if we don't need to, e.g., + // in streaming mode, we have already inferred and registered partition columns, we will + // never have to materialize the lazy val below + private lazy val tempFileIndex = { --- End diff -- it's only used once, no need to be a lazy val, we can just inline it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20953: [SPARK-23822][SQL] Improve error message for Parq...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20953#discussion_r181268257 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -179,7 +182,23 @@ class FileScanRDD( currentIterator = readCurrentFile() } - hasNext + try { +hasNext + } catch { +case e: SchemaColumnConvertNotSupportedException => + val message = "Parquet column cannot be converted in " + +s"file ${currentFile.filePath}. Column: ${e.getColumn}, " + +s"Expected: ${e.getLogicalType}, Found: ${e.getPhysicalType}" + throw new QueryExecutionException(message, e) --- End diff -- I don't think that's captured since it's thrown in execution. Can you double check if that's true? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20977: [SPARK-23867][Scheduler] use droppedCount in logW...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20977 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21045: [WIP][SPARK-23931][SQL] Adds zip function to spar...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r181267751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -87,6 +87,62 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2) - Returns a merged array matching N-th element of first + array with the N-th element of second.""", + examples = """ +Examples + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + """, + since = "2.4.0") +case class Zip(left: Expression, right: Expression) + extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def dataType: DataType = ArrayType(left.dataType.asInstanceOf[ArrayType].elementType) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (arr1, arr2) => { + val i = ctx.freshName("i") + s""" + for (int $i = 0; $i < $arr1.numElements(); $i ++) { +if ($arr1.isNullAt($i)) { + ${ev.isNull} = true; +} else { + ${ev.value}[$i] = ($arr1[$i], $arr2[$i]); --- End diff -- One high level question. Do we have performance or other advantages when we generate Java code? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21048: [SPARK-23966][SS] Refactoring all checkpoint file...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21048#discussion_r181267497 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala --- @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileNotFoundException, IOException, OutputStream} +import java.util.{EnumSet, UUID} + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs} +import org.apache.hadoop.fs.permission.FsPermission + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * An interface to abstract out all operation related to streaming checkpoints. Most importantly, + * the key operation this interface provides is `createAtomic(path, overwrite)` which returns a + * `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and + * [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations + * to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or + * without overwrite. + * + * This higher-level interface above the Hadoop FileSystem is necessary because + * different implementation of FileSystem/FileContext may have different combination of operations + * to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename, + * direct-write-and-cancel-on-failure) and this abstraction allow different implementations while + * keeping the usage simple (`createAtomic` -> `close` or `cancel`). + */ +trait CheckpointFileManager { + + import org.apache.spark.sql.execution.streaming.CheckpointFileManager._ + + /** + * Create a file and make its contents available atomically after the output stream is closed. + * + * @param pathPath to create + * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to + *overwrite the file if it already exists. It should not throw + *any exception if the file exists. However, if false, then the + *implementation must not overwrite if the file alraedy exists and + *must throw `FileAlreadyExistsException` in that case. + */ + def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream + + /** Open a file for reading, or throw exception if it does not exist. */ + def open(path: Path): FSDataInputStream + + /** List the files in a path that match a filter. */ + def list(path: Path, filter: PathFilter): Array[FileStatus] + + /** List all the files in a path. */ + def list(path: Path): Array[FileStatus] = { +list(path, new PathFilter { override def accept(path: Path): Boolean = true }) + } + + /** Make directory at the give path and all its parent directories as needed. */ + def mkdirs(path: Path): Unit + + /** Whether path exists */ + def exists(path: Path): Boolean + + /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */ + def delete(path: Path): Unit + + /** Is the default file system this implementation is operating on the local file system. */ + def isLocal: Boolean +} + +object CheckpointFileManager extends Logging { + + /** + * Additional methods in CheckpointFileManager implementations that allows + * [[RenameBasedFSDataOutputStream]] get atomicity by write-to-temp-file-and-rename + */ + sealed trait RenameHelperMethods {
[GitHub] spark issue #20977: [SPARK-23867][Scheduler] use droppedCount in logWarning
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20977 thanks, merging to master/2.3! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20695: [SPARK-21741][ML][PySpark] Python API for DataFra...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20695#discussion_r181263309 --- Diff: python/pyspark/ml/stat.py --- @@ -195,6 +197,185 @@ def test(dataset, sampleCol, distName, *params): _jvm().PythonUtils.toSeq(params))) +class Summarizer(object): +""" +.. note:: Experimental + +Tools for vectorized statistics on MLlib Vectors. +The methods in this package provide various statistics for Vectors contained inside DataFrames. +This class lets users pick the statistics they would like to extract for a given column. + +>>> from pyspark.ml.stat import Summarizer +>>> from pyspark.sql import Row +>>> from pyspark.ml.linalg import Vectors +>>> summarizer = Summarizer.metrics("mean", "count") +>>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)), +... Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF() +>>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False) ++---+ +|aggregate_metrics(features, weight)| ++---+ +|[[1.0,1.0,1.0], 1] | ++---+ + +>>> df.select(summarizer.summary(df.features)).show(truncate=False) +++ +|aggregate_metrics(features, 1.0)| +++ +|[[1.0,1.5,2.0], 2] | +++ + +>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.0,1.0] | ++--+ + +>>> df.select(Summarizer.mean(df.features)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.5,2.0] | ++--+ + + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def mean(col, weightCol=None): +""" +return a column of mean summary +""" +return Summarizer._get_single_metric(col, weightCol, "mean") + +@staticmethod +@since("2.4.0") +def variance(col, weightCol=None): +""" +return a column of variance summary +""" +return Summarizer._get_single_metric(col, weightCol, "variance") + +@staticmethod +@since("2.4.0") +def count(col, weightCol=None): +""" +return a column of count summary +""" +return Summarizer._get_single_metric(col, weightCol, "count") + +@staticmethod +@since("2.4.0") +def numNonZeros(col, weightCol=None): +""" +return a column of numNonZero summary +""" +return Summarizer._get_single_metric(col, weightCol, "numNonZeros") + +@staticmethod +@since("2.4.0") +def max(col, weightCol=None): +""" +return a column of max summary +""" +return Summarizer._get_single_metric(col, weightCol, "max") + +@staticmethod +@since("2.4.0") +def min(col, weightCol=None): +""" +return a column of min summary +""" +return Summarizer._get_single_metric(col, weightCol, "min") + +@staticmethod +@since("2.4.0") +def normL1(col, weightCol=None): +""" +return a column of normL1 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL1") + +@staticmethod +@since("2.4.0") +def normL2(col, weightCol=None): +""" +return a column of normL2 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL2") + +@staticmethod +def _check_param(featureCol, weightCol): +if weightCol is None: +weightCol = lit(1.0) +if not isinstance(featureCol, Column) or not isinstance(weightCol, Column): +raise TypeError("featureCol and weightCol should be a Column") +return featureCol, weightCol + +@staticmethod +def _get_single_metric(col, weightCol, metric): +col, weightCol = Summarizer._check_param(col, weightCol) +return Column(JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer." + metric, +col._jc, weightCol._jc)) + +@staticmethod +
[GitHub] spark pull request #20695: [SPARK-21741][ML][PySpark] Python API for DataFra...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20695#discussion_r181259361 --- Diff: python/pyspark/ml/stat.py --- @@ -195,6 +197,185 @@ def test(dataset, sampleCol, distName, *params): _jvm().PythonUtils.toSeq(params))) +class Summarizer(object): +""" +.. note:: Experimental + +Tools for vectorized statistics on MLlib Vectors. +The methods in this package provide various statistics for Vectors contained inside DataFrames. +This class lets users pick the statistics they would like to extract for a given column. + +>>> from pyspark.ml.stat import Summarizer +>>> from pyspark.sql import Row +>>> from pyspark.ml.linalg import Vectors +>>> summarizer = Summarizer.metrics("mean", "count") +>>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)), +... Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF() +>>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False) ++---+ +|aggregate_metrics(features, weight)| ++---+ +|[[1.0,1.0,1.0], 1] | ++---+ + +>>> df.select(summarizer.summary(df.features)).show(truncate=False) +++ +|aggregate_metrics(features, 1.0)| +++ +|[[1.0,1.5,2.0], 2] | +++ + +>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.0,1.0] | ++--+ + +>>> df.select(Summarizer.mean(df.features)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.5,2.0] | ++--+ + + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def mean(col, weightCol=None): +""" +return a column of mean summary +""" +return Summarizer._get_single_metric(col, weightCol, "mean") + +@staticmethod +@since("2.4.0") +def variance(col, weightCol=None): +""" +return a column of variance summary +""" +return Summarizer._get_single_metric(col, weightCol, "variance") + +@staticmethod +@since("2.4.0") +def count(col, weightCol=None): +""" +return a column of count summary +""" +return Summarizer._get_single_metric(col, weightCol, "count") + +@staticmethod +@since("2.4.0") +def numNonZeros(col, weightCol=None): +""" +return a column of numNonZero summary +""" +return Summarizer._get_single_metric(col, weightCol, "numNonZeros") + +@staticmethod +@since("2.4.0") +def max(col, weightCol=None): +""" +return a column of max summary +""" +return Summarizer._get_single_metric(col, weightCol, "max") + +@staticmethod +@since("2.4.0") +def min(col, weightCol=None): +""" +return a column of min summary +""" +return Summarizer._get_single_metric(col, weightCol, "min") + +@staticmethod +@since("2.4.0") +def normL1(col, weightCol=None): +""" +return a column of normL1 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL1") + +@staticmethod +@since("2.4.0") +def normL2(col, weightCol=None): +""" +return a column of normL2 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL2") + +@staticmethod +def _check_param(featureCol, weightCol): +if weightCol is None: +weightCol = lit(1.0) +if not isinstance(featureCol, Column) or not isinstance(weightCol, Column): +raise TypeError("featureCol and weightCol should be a Column") +return featureCol, weightCol + +@staticmethod +def _get_single_metric(col, weightCol, metric): +col, weightCol = Summarizer._check_param(col, weightCol) +return Column(JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer." + metric, +col._jc, weightCol._jc)) + +@staticmethod +
[GitHub] spark pull request #20695: [SPARK-21741][ML][PySpark] Python API for DataFra...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20695#discussion_r181259181 --- Diff: python/pyspark/ml/stat.py --- @@ -195,6 +197,185 @@ def test(dataset, sampleCol, distName, *params): _jvm().PythonUtils.toSeq(params))) +class Summarizer(object): +""" +.. note:: Experimental + +Tools for vectorized statistics on MLlib Vectors. +The methods in this package provide various statistics for Vectors contained inside DataFrames. +This class lets users pick the statistics they would like to extract for a given column. + +>>> from pyspark.ml.stat import Summarizer +>>> from pyspark.sql import Row +>>> from pyspark.ml.linalg import Vectors +>>> summarizer = Summarizer.metrics("mean", "count") +>>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)), +... Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF() +>>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False) ++---+ +|aggregate_metrics(features, weight)| ++---+ +|[[1.0,1.0,1.0], 1] | ++---+ + +>>> df.select(summarizer.summary(df.features)).show(truncate=False) +++ +|aggregate_metrics(features, 1.0)| +++ +|[[1.0,1.5,2.0], 2] | +++ + +>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.0,1.0] | ++--+ + +>>> df.select(Summarizer.mean(df.features)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.5,2.0] | ++--+ + + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def mean(col, weightCol=None): +""" +return a column of mean summary +""" +return Summarizer._get_single_metric(col, weightCol, "mean") + +@staticmethod +@since("2.4.0") +def variance(col, weightCol=None): +""" +return a column of variance summary +""" +return Summarizer._get_single_metric(col, weightCol, "variance") + +@staticmethod +@since("2.4.0") +def count(col, weightCol=None): +""" +return a column of count summary +""" +return Summarizer._get_single_metric(col, weightCol, "count") + +@staticmethod +@since("2.4.0") +def numNonZeros(col, weightCol=None): +""" +return a column of numNonZero summary +""" +return Summarizer._get_single_metric(col, weightCol, "numNonZeros") + +@staticmethod +@since("2.4.0") +def max(col, weightCol=None): +""" +return a column of max summary +""" +return Summarizer._get_single_metric(col, weightCol, "max") + +@staticmethod +@since("2.4.0") +def min(col, weightCol=None): +""" +return a column of min summary +""" +return Summarizer._get_single_metric(col, weightCol, "min") + +@staticmethod +@since("2.4.0") +def normL1(col, weightCol=None): +""" +return a column of normL1 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL1") + +@staticmethod +@since("2.4.0") +def normL2(col, weightCol=None): +""" +return a column of normL2 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL2") + +@staticmethod +def _check_param(featureCol, weightCol): +if weightCol is None: +weightCol = lit(1.0) +if not isinstance(featureCol, Column) or not isinstance(weightCol, Column): +raise TypeError("featureCol and weightCol should be a Column") +return featureCol, weightCol + +@staticmethod +def _get_single_metric(col, weightCol, metric): +col, weightCol = Summarizer._check_param(col, weightCol) +return Column(JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer." + metric, +col._jc, weightCol._jc)) + +@staticmethod +
[GitHub] spark pull request #20695: [SPARK-21741][ML][PySpark] Python API for DataFra...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20695#discussion_r181259536 --- Diff: python/pyspark/ml/stat.py --- @@ -195,6 +197,185 @@ def test(dataset, sampleCol, distName, *params): _jvm().PythonUtils.toSeq(params))) +class Summarizer(object): +""" +.. note:: Experimental + +Tools for vectorized statistics on MLlib Vectors. +The methods in this package provide various statistics for Vectors contained inside DataFrames. +This class lets users pick the statistics they would like to extract for a given column. + +>>> from pyspark.ml.stat import Summarizer +>>> from pyspark.sql import Row +>>> from pyspark.ml.linalg import Vectors +>>> summarizer = Summarizer.metrics("mean", "count") +>>> df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)), +... Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF() +>>> df.select(summarizer.summary(df.features, df.weight)).show(truncate=False) ++---+ +|aggregate_metrics(features, weight)| ++---+ +|[[1.0,1.0,1.0], 1] | ++---+ + +>>> df.select(summarizer.summary(df.features)).show(truncate=False) +++ +|aggregate_metrics(features, 1.0)| +++ +|[[1.0,1.5,2.0], 2] | +++ + +>>> df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.0,1.0] | ++--+ + +>>> df.select(Summarizer.mean(df.features)).show(truncate=False) ++--+ +|mean(features)| ++--+ +|[1.0,1.5,2.0] | ++--+ + + +.. versionadded:: 2.4.0 + +""" +@staticmethod +@since("2.4.0") +def mean(col, weightCol=None): +""" +return a column of mean summary +""" +return Summarizer._get_single_metric(col, weightCol, "mean") + +@staticmethod +@since("2.4.0") +def variance(col, weightCol=None): +""" +return a column of variance summary +""" +return Summarizer._get_single_metric(col, weightCol, "variance") + +@staticmethod +@since("2.4.0") +def count(col, weightCol=None): +""" +return a column of count summary +""" +return Summarizer._get_single_metric(col, weightCol, "count") + +@staticmethod +@since("2.4.0") +def numNonZeros(col, weightCol=None): +""" +return a column of numNonZero summary +""" +return Summarizer._get_single_metric(col, weightCol, "numNonZeros") + +@staticmethod +@since("2.4.0") +def max(col, weightCol=None): +""" +return a column of max summary +""" +return Summarizer._get_single_metric(col, weightCol, "max") + +@staticmethod +@since("2.4.0") +def min(col, weightCol=None): +""" +return a column of min summary +""" +return Summarizer._get_single_metric(col, weightCol, "min") + +@staticmethod +@since("2.4.0") +def normL1(col, weightCol=None): +""" +return a column of normL1 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL1") + +@staticmethod +@since("2.4.0") +def normL2(col, weightCol=None): +""" +return a column of normL2 summary +""" +return Summarizer._get_single_metric(col, weightCol, "normL2") + +@staticmethod +def _check_param(featureCol, weightCol): +if weightCol is None: +weightCol = lit(1.0) +if not isinstance(featureCol, Column) or not isinstance(weightCol, Column): +raise TypeError("featureCol and weightCol should be a Column") +return featureCol, weightCol + +@staticmethod +def _get_single_metric(col, weightCol, metric): +col, weightCol = Summarizer._check_param(col, weightCol) +return Column(JavaWrapper._new_java_obj("org.apache.spark.ml.stat.Summarizer." + metric, +col._jc, weightCol._jc)) + +@staticmethod +
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21044 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89309/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21044 **[Test build #89309 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89309/testReport)** for PR 21044 at commit [`0c32fca`](https://github.com/apache/spark/commit/0c32fcaaf87f1922170e4ce7e60381ccd23ab6e8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21044 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21044 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21044 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89308/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21044: [SPARK-9312][ML] Add RawPrediction, numClasses, and numF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21044 **[Test build #89308 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89308/testReport)** for PR 21044 at commit [`2a47e2b`](https://github.com/apache/spark/commit/2a47e2be30d52e3fbea7e1eeeaa5048a6ac97116). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21017: [SPARK-23748][SS] Fix SS continuous process doesn't supp...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21017 Thanks @jose-torres for your review. @tdas would you please take a look at this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20938: [SPARK-23821][SQL] Collection function: flatten
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20938 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89303/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20938: [SPARK-23821][SQL] Collection function: flatten
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20938 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20938: [SPARK-23821][SQL] Collection function: flatten
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20938 **[Test build #89303 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89303/testReport)** for PR 20938 at commit [`0e0def4`](https://github.com/apache/spark/commit/0e0def48709fd04efeee5c51cdaea641e47eae15). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21034: [SPARK-23926][SQL] Extending reverse function to support...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21034 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21034: [SPARK-23926][SQL] Extending reverse function to support...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21034 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89301/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21034: [SPARK-23926][SQL] Extending reverse function to support...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21034 **[Test build #89301 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89301/testReport)** for PR 21034 at commit [`ad17d49`](https://github.com/apache/spark/commit/ad17d49b79d8d80318d8e9cdb01551975bccadac). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20772: [SPARK-23628][SQL] calculateParamLength should not retur...
Github user rednaxelafx commented on the issue: https://github.com/apache/spark/pull/20772 Just wanna leave a note here that the Janino side of the bug has been fixed: https://github.com/janino-compiler/janino/pull/46#issuecomment-380799160 https://github.com/janino-compiler/janino/commit/a373ac9bb6d28d51dccb7421ea046d73809d7ce2 With this fix, Janino will correctly throw an exception upon invalid parameter size instead of silently allowing it to pass compilation (which leads to ClassFormatError later when the generated class is loaded by the JVM). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21048 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2292/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21048: [SPARK-23966][SS] Refactoring all checkpoint file writin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21048 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org