[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89267570 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.{lang => jl} +import java.sql.{Date, Timestamp} + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} +import org.apache.spark.sql.test.SQLTestData.ArrayData +import org.apache.spark.sql.types._ + + +/** + * End-to-end suite testing statistics collection and use on both entire table and columns. + */ +class StatisticsCollectionSuite extends StatisticsCollectionTestBase with SharedSQLContext { + import testImplicits._ + + private def checkTableStats(tableName: String, expectedRowCount: Option[Int]) +: Option[Statistics] = { +val df = spark.table(tableName) +val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation => + assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount) + rel.catalogTable.get.stats +} +assert(stats.size == 1) +stats.head + } + + test("estimates the size of a limit 0 on outer join") { +withTempView("test") { + Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v") +.createOrReplaceTempView("test") + val df1 = spark.table("test") + val df2 = spark.table("test").limit(0) + val df = df1.join(df2, Seq("k"), "left") + + val sizes = df.queryExecution.analyzed.collect { case g: Join => +g.statistics.sizeInBytes + } + + assert(sizes.size === 1, s"number of Join nodes is wrong:\n ${df.queryExecution}") + assert(sizes.head === BigInt(96), --- End diff -- This is just code move around - it was the old test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15986 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15986 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69053/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/15986 I managed to come up with a standalone end-to-end reproduction of the shuffle file leak, allowing me to validate this patch's fix. Run ``` ./bin/spark-shell --master=local-cluster[2,5,1024] --conf spark.task.maxFailures=1 --conf spark.local.dir=/tmp ``` to bring up a Spark shell with two executor JVMs. Then, execute the following: ```scala sc.parallelize(1 to 10, 10).map { x => Thread.sleep(1000); (x, x) }.groupByKey(10).map { case _ => Thread.sleep(120 * 1000); 1 }.union(sc.parallelize(1 to 10)).count() ``` (Note that the `.union()` here is critical for the reproduction; I explain this below). The `Thread.sleep()` calls were strategically chosen so that we'll get the executor JVMs into a state where both executors have run shuffle map tasks and both are in the middle of running reduce / result tasks. Next, kill one of the executor JVMs abruptly with `kill -9`. The worker JVM will immediately detect its executor JVM's death and will send messages to the master causing that executor's tasks to be marked as failed. Because of `spark.task.maxFailures=1` this will cause the job to immediately fail but there will still be five running zombie tasks on the executor that we didn't kill. Wait until those zombie tasks have finished (which will happen within two minutes), then run `System.gc()`, then check the non-killed executor's block manager directories and observe that shuffle files have been leaked. This is due to the leak of the `ShuffleDependency`, which can be validated with `jmap -histo`: ```bash $ jmap -histo 72081 | grep ShuffleDependency 2037: 1 56 org.apache.spark.ShuffleDependency ``` This is because the `TaskSetManager` was leaked: ``` jmap -histo 72081 | grep 'org.apache.spark.scheduler.TaskSetManager$' 1252: 1224 org.apache.spark.scheduler.TaskSetManager ``` Note that while executor death seems to always leak a `TaskSetManager`, this doesn't always result in a leaked `ShuffleDependency`; the reasons for this are slightly subtle and I can expand on them later, but to summarize in a nutshell: a `Task` whose partition is a `ShuffleRddPartition` won't actually contain a reference to the parent RDD; the parent RDD and `ShuffleDependency` will be kept alive in the scheduler via the parent stage and via inter-stage relationships, but there won't be a direct reference chain from the `Task` itself. On the other hand, some partition types such as `UnionRDDPartition` may have transient references to parent RDD objects, causing the driver-side `Task` to keep the whole RDD and ShuffleDependency lineage chain alive. This usually isn't a problem since `Task`s typically don't get leaked like this and the `@transient` fields prevent us from over-capturing during serialization, but it exacerbates the `TaskSetManager` leaks here. After applying this PR's changes, you can re-run the same experiment and see that both the `TaskSetManager` and `ShuffleDependency` are properly cleaned up after the zombie tasks finish and GC has run to trigger the ContextCleaner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15986 **[Test build #69053 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69053/consoleFull)** for PR 15986 at commit [`e99cc8f`](https://github.com/apache/spark/commit/e99cc8ffad9c47976d5743502852cc66f59452d3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15977: [SPARK-18436][SQL] isin causing SQL syntax error ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/15977#discussion_r89266916 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -114,10 +114,14 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 1. Removes literal repetitions. * 2. Replaces [[In (value, seq[Literal])]] with optimized version *[[InSet (value, HashSet[Literal])]] which is much faster. + * 3. Replaces [[In (value, Seq.empty)]] with false literal. */ case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { + case expr @ In(v, list) if list.isEmpty => --- End diff -- How about we add a new case to handle null literal in value? Like the following: ``` case expr @ In(v @ Literal(null, _), list) => v case expr @ In(v, list) if list.isEmpty => FalseLiteral ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15959 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69051/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15959 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15959 **[Test build #69051 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69051/consoleFull)** for PR 15959 at commit [`0c07165`](https://github.com/apache/spark/commit/0c0716576fbb4362575be0dff685501ad600c870). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/15959 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89265835 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -310,6 +270,110 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("verify serialized column stats after analyzing columns") { +import testImplicits._ + +val tableName = "column_stats_test2" +// (data.head.productArity - 1) because the last column does not support stats collection. +assert(stats.size == data.head.productArity - 1) +val df = data.toDF(stats.keys.toSeq :+ "carray" : _*) + +withTable(tableName) { + df.write.saveAsTable(tableName) + + // Collect statistics + sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", ")) + + // Validate statistics + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + val table = hiveClient.getTable("default", tableName) + + val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats")) --- End diff -- use `HiveExternalCatalog.STATISTICS_COL_STATS_PREFIX`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15988: [SPARK-18519][SQL][BRANCH-2.0] map type can not be used ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15988 **[Test build #69056 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69056/consoleFull)** for PR 15988 at commit [`acfa6ba`](https://github.com/apache/spark/commit/acfa6ba6af5a1c484607d0be3b105c3d1829536c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15959 **[Test build #69057 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69057/consoleFull)** for PR 15959 at commit [`b341cc6`](https://github.com/apache/spark/commit/b341cc655c415a5af9473fbce4deb925b1f77660). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15988: [SPARK-18519][SQL][BRANCH-2.0] map type can not be used ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/15988 cc @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15988: [SPARK-18519][SQL][BRANCH-2.0] map type can not b...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/15988 [SPARK-18519][SQL][BRANCH-2.0] map type can not be used in EqualTo ## What changes were proposed in this pull request? Technically map type is not orderable, but can be used in equality comparison. However, due to the limitation of the current implementation, map type can't be used in equality comparison so that it can't be join key or grouping key. This PR makes this limitation explicit, to avoid wrong result. backport https://github.com/apache/spark/pull/15956 to 2.0 ## How was this patch tested? updated tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark map-type Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15988.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15988 commit acfa6ba6af5a1c484607d0be3b105c3d1829536c Author: Wenchen FanDate: 2016-11-23T07:05:36Z map type can not be used in EqualTo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89264465 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -310,6 +270,110 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("verify serialized column stats after analyzing columns") { +import testImplicits._ + +val tableName = "column_stats_test2" +// (data.head.productArity - 1) because the last column does not support stats collection. +assert(stats.size == data.head.productArity - 1) +val df = data.toDF(stats.keys.toSeq :+ "carray" : _*) + +withTable(tableName) { + df.write.saveAsTable(tableName) + + // Collect statistics + sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", ")) + + // Validate statistics + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + val table = hiveClient.getTable("default", tableName) + + val props = table.properties.filterKeys(_.startsWith("spark.sql.statistics.colStats")) + assert(props == Map( --- End diff -- cc @cloud-fan / @wzhfy This is a very explicit test for the serialization protocol. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89263076 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -84,30 +88,106 @@ case class DataSource( private val caseInsensitiveOptions = new CaseInsensitiveMap(options) /** - * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer + * it. In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. + * This method will try to skip file scanning whether `userSpecifiedSchema` and + * `partitionColumns` are provided. Here are some code paths that use this method: + * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns + * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the + * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred + * dataType if they don't. + * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to + * provide the schema. Here, we also perform partition inference like 2, and try to use + * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use + * this information, therefore calls to this method should be very cheap, i.e. there won't + * be any further inference in any triggers. + * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the + * existing table's partitioning scheme. This is achieved by not providing + * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early + * exit, if we don't care about the schema of the original table. + * + * @param format the file format object for this DataSource + * @param justPartitioning Whether to exit early and provide just the schema partitioning. + * @return A pair of the data schema (excluding partition columns) and the schema of the partition + * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as + * `null`. */ - private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { -userSpecifiedSchema.map(_ -> partitionColumns).orElse { - val allPaths = caseInsensitiveOptions.get("path") + private def getOrInferFileFormatSchema( --- End diff -- Well, just realized that it might be hard to split because of the temporary `InMemoryFileIndex`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89242805 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -84,30 +88,106 @@ case class DataSource( private val caseInsensitiveOptions = new CaseInsensitiveMap(options) /** - * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer + * it. In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. + * This method will try to skip file scanning whether `userSpecifiedSchema` and + * `partitionColumns` are provided. Here are some code paths that use this method: + * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns + * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the + * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred + * dataType if they don't. + * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to + * provide the schema. Here, we also perform partition inference like 2, and try to use + * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use + * this information, therefore calls to this method should be very cheap, i.e. there won't + * be any further inference in any triggers. + * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the + * existing table's partitioning scheme. This is achieved by not providing + * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early + * exit, if we don't care about the schema of the original table. + * + * @param format the file format object for this DataSource + * @param justPartitioning Whether to exit early and provide just the schema partitioning. + * @return A pair of the data schema (excluding partition columns) and the schema of the partition + * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as + * `null`. */ - private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { -userSpecifiedSchema.map(_ -> partitionColumns).orElse { - val allPaths = caseInsensitiveOptions.get("path") + private def getOrInferFileFormatSchema( + format: FileFormat, + justPartitioning: Boolean = false): (StructType, StructType) = { +// the operations below are expensive therefore try not to do them if we don't need to +lazy val tempFileCatalog = { --- End diff -- Nit: `tempFileIndex` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89249380 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -84,30 +88,106 @@ case class DataSource( private val caseInsensitiveOptions = new CaseInsensitiveMap(options) /** - * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer + * it. In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. + * This method will try to skip file scanning whether `userSpecifiedSchema` and + * `partitionColumns` are provided. Here are some code paths that use this method: + * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns + * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the + * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred + * dataType if they don't. + * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to + * provide the schema. Here, we also perform partition inference like 2, and try to use + * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use + * this information, therefore calls to this method should be very cheap, i.e. there won't + * be any further inference in any triggers. + * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the + * existing table's partitioning scheme. This is achieved by not providing + * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early + * exit, if we don't care about the schema of the original table. + * + * @param format the file format object for this DataSource + * @param justPartitioning Whether to exit early and provide just the schema partitioning. + * @return A pair of the data schema (excluding partition columns) and the schema of the partition + * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as + * `null`. */ - private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { -userSpecifiedSchema.map(_ -> partitionColumns).orElse { - val allPaths = caseInsensitiveOptions.get("path") + private def getOrInferFileFormatSchema( + format: FileFormat, + justPartitioning: Boolean = false): (StructType, StructType) = { +// the operations below are expensive therefore try not to do them if we don't need to +lazy val tempFileCatalog = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() val globbedPaths = allPaths.toSeq.flatMap { path => val hdfsPath = new Path(path) -val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None) - val partitionSchema = fileCatalog.partitionSpec().partitionColumns - val inferred = format.inferSchema( + new InMemoryFileIndex(sparkSession, globbedPaths, options, None) +} +val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) { + // Try to infer partitioning, because no DataSource in the read path provides the partitioning + // columns properly unless it is a Hive DataSource + val resolved = tempFileCatalog.partitionSchema.map { partitionField => +val equality = sparkSession.sessionState.conf.resolver +// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred +userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( + partitionField) + } + StructType(resolved) +} else { + // in streaming mode, we have already inferred and registered partition columns, we will + // never
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89252556 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -84,30 +88,106 @@ case class DataSource( private val caseInsensitiveOptions = new CaseInsensitiveMap(options) /** - * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer + * it. In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. + * This method will try to skip file scanning whether `userSpecifiedSchema` and + * `partitionColumns` are provided. Here are some code paths that use this method: + * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns + * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the + * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred + * dataType if they don't. + * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to + * provide the schema. Here, we also perform partition inference like 2, and try to use + * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use + * this information, therefore calls to this method should be very cheap, i.e. there won't + * be any further inference in any triggers. + * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the + * existing table's partitioning scheme. This is achieved by not providing + * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early + * exit, if we don't care about the schema of the original table. + * + * @param format the file format object for this DataSource + * @param justPartitioning Whether to exit early and provide just the schema partitioning. + * @return A pair of the data schema (excluding partition columns) and the schema of the partition + * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as + * `null`. */ - private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { -userSpecifiedSchema.map(_ -> partitionColumns).orElse { - val allPaths = caseInsensitiveOptions.get("path") + private def getOrInferFileFormatSchema( --- End diff -- I think it would be clearer if we can split this method into two: one for partition schema and the other for data schema. In this way, we can also remove the `justPartitioning` argument by calling the method you need at the right place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89262935 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -274,7 +274,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { pathToPartitionedTable, userSpecifiedSchema = Option("num int, str string"), userSpecifiedPartitionCols = partitionCols, - expectedSchema = new StructType().add("num", IntegerType).add("str", StringType), + expectedSchema = new StructType().add("str", StringType).add("num", IntegerType), --- End diff -- I believe the original test case was incorrect. Although the schema check passes, if you really read rows out of the Dataset, you'll hit an exception, as shown in the following Spark shell session: ```scala import org.apache.spark.sql.types._ val df0 = spark.range(10).select( ('id % 4) cast StringType as "part", 'id cast StringType as "data" ) val path = "/tmp/part.parquet" df0.write.mode("overwrite").partitionBy("part").parquet(path) val df1 = spark.read.schema( new StructType() .add("part", StringType, nullable = true) .add("data", StringType, nullable = true) ).parquet(path) df1.printSchema() // root // |-- part: string (nullable = true) // |-- data: string (nullable = true) df1.show() // 16/11/22 22:52:21 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 34) // java.lang.NullPointerException // at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getArrayLength(OnHeapColumnVector.java:375) // at org.apache.spark.sql.execution.vectorized.ColumnVector.getArray(ColumnVector.java:554) // at org.apache.spark.sql.execution.vectorized.ColumnVector.getByteArray(ColumnVector.java:576) // [...] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89249078 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -84,30 +88,106 @@ case class DataSource( private val caseInsensitiveOptions = new CaseInsensitiveMap(options) /** - * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer + * it. In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. + * This method will try to skip file scanning whether `userSpecifiedSchema` and + * `partitionColumns` are provided. Here are some code paths that use this method: + * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns + * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the + * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred + * dataType if they don't. + * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to + * provide the schema. Here, we also perform partition inference like 2, and try to use + * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use + * this information, therefore calls to this method should be very cheap, i.e. there won't + * be any further inference in any triggers. + * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the + * existing table's partitioning scheme. This is achieved by not providing + * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early + * exit, if we don't care about the schema of the original table. + * + * @param format the file format object for this DataSource + * @param justPartitioning Whether to exit early and provide just the schema partitioning. + * @return A pair of the data schema (excluding partition columns) and the schema of the partition + * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as + * `null`. */ - private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { -userSpecifiedSchema.map(_ -> partitionColumns).orElse { - val allPaths = caseInsensitiveOptions.get("path") + private def getOrInferFileFormatSchema( + format: FileFormat, + justPartitioning: Boolean = false): (StructType, StructType) = { +// the operations below are expensive therefore try not to do them if we don't need to +lazy val tempFileCatalog = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() val globbedPaths = allPaths.toSeq.flatMap { path => val hdfsPath = new Path(path) -val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None) - val partitionSchema = fileCatalog.partitionSpec().partitionColumns - val inferred = format.inferSchema( + new InMemoryFileIndex(sparkSession, globbedPaths, options, None) +} +val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) { + // Try to infer partitioning, because no DataSource in the read path provides the partitioning + // columns properly unless it is a Hive DataSource + val resolved = tempFileCatalog.partitionSchema.map { partitionField => +val equality = sparkSession.sessionState.conf.resolver +// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred +userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( + partitionField) + } + StructType(resolved) +} else { + // in streaming mode, we have already inferred and registered partition columns, we will + // never
[GitHub] spark pull request #15951: [SPARK-18510] Fix data corruption from inferred p...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/15951#discussion_r89248592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -84,30 +88,106 @@ case class DataSource( private val caseInsensitiveOptions = new CaseInsensitiveMap(options) /** - * Infer the schema of the given FileFormat, returns a pair of schema and partition column names. + * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer + * it. In the read path, only managed tables by Hive provide the partition columns properly when + * initializing this class. All other file based data sources will try to infer the partitioning, + * and then cast the inferred types to user specified dataTypes if the partition columns exist + * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510. + * This method will try to skip file scanning whether `userSpecifiedSchema` and + * `partitionColumns` are provided. Here are some code paths that use this method: + * 1. `spark.read` (no schema): Most amount of work. Infer both schema and partitioning columns + * 2. `spark.read.schema(userSpecifiedSchema)`: Parse partitioning columns, cast them to the + * dataTypes provided in `userSpecifiedSchema` if they exist or fallback to inferred + * dataType if they don't. + * 3. `spark.readStream.schema(userSpecifiedSchema)`: For streaming use cases, users have to + * provide the schema. Here, we also perform partition inference like 2, and try to use + * dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use + * this information, therefore calls to this method should be very cheap, i.e. there won't + * be any further inference in any triggers. + * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the + * existing table's partitioning scheme. This is achieved by not providing + * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early + * exit, if we don't care about the schema of the original table. + * + * @param format the file format object for this DataSource + * @param justPartitioning Whether to exit early and provide just the schema partitioning. + * @return A pair of the data schema (excluding partition columns) and the schema of the partition + * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as + * `null`. */ - private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = { -userSpecifiedSchema.map(_ -> partitionColumns).orElse { - val allPaths = caseInsensitiveOptions.get("path") + private def getOrInferFileFormatSchema( + format: FileFormat, + justPartitioning: Boolean = false): (StructType, StructType) = { +// the operations below are expensive therefore try not to do them if we don't need to +lazy val tempFileCatalog = { + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val hadoopConf = sparkSession.sessionState.newHadoopConf() val globbedPaths = allPaths.toSeq.flatMap { path => val hdfsPath = new Path(path) -val fs = hdfsPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) +val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None) - val partitionSchema = fileCatalog.partitionSpec().partitionColumns - val inferred = format.inferSchema( + new InMemoryFileIndex(sparkSession, globbedPaths, options, None) +} +val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) { + // Try to infer partitioning, because no DataSource in the read path provides the partitioning + // columns properly unless it is a Hive DataSource + val resolved = tempFileCatalog.partitionSchema.map { partitionField => +val equality = sparkSession.sessionState.conf.resolver +// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred +userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( + partitionField) + } + StructType(resolved) +} else { + // in streaming mode, we have already inferred and registered partition columns, we will + // never
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15986 cc @tejasapatil --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15694: [SPARK-18179][SQL] Throws analysis exception with...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15694 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15694: [SPARK-18179][SQL] Throws analysis exception with a prop...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15694 Merging in master/branch-2.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails for no...
Github user rxin commented on the issue: https://github.com/apache/spark/pull/15987 Why does it cast it to double? The fix looks pretty weird. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15985 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69048/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15985 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15985 **[Test build #69048 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69048/consoleFull)** for PR 15985 at commit [`e6d835f`](https://github.com/apache/spark/commit/e6d835f6516256ea31be158fca5b8cbfdd7dc7e8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15959 **[Test build #69055 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69055/consoleFull)** for PR 15959 at commit [`b1af024`](https://github.com/apache/spark/commit/b1af024f931ae2ac0edc1e2cf94cb73689d6ab3e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15877 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15877 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69047/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15877 **[Test build #69047 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69047/consoleFull)** for PR 15877 at commit [`a6bbefc`](https://github.com/apache/spark/commit/a6bbefcb8c0b8c4b14c47a3ff031be064f576ab2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15956: [SPARK-18519][SQL] map type can not be used in EqualTo
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/15956 Yea, I'll backport --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15983: [SPARK-18544] [SQL] Append with df.saveAsTable writes da...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15983 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69045/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15983: [SPARK-18544] [SQL] Append with df.saveAsTable writes da...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15983 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15983: [SPARK-18544] [SQL] Append with df.saveAsTable writes da...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15983 **[Test build #69045 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69045/consoleFull)** for PR 15983 at commit [`ca75331`](https://github.com/apache/spark/commit/ca753311a6d61452d7c29a349b8c34e66998f5ee). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15959 **[Test build #69054 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69054/consoleFull)** for PR 15959 at commit [`e407035`](https://github.com/apache/spark/commit/e407035433b9c1477b0b3c8f6551ef9f7100289b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15877 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69049/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15877 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/15959 LGTM except one minor comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15877 **[Test build #69049 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69049/consoleFull)** for PR 15877 at commit [`1bfb6fd`](https://github.com/apache/spark/commit/1bfb6fd7810c7db9218bcdb65aed80102406b211). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258891 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,176 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + // We currently don't store min/max for byte arrays. This can change in the future and then + // we need to remove this require. + require(min.isEmpty || !min.get.isInstanceOf[Array[Byte]]) + require(max.isEmpty || !max.get.isInstanceOf[Array[Byte]]) - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they won't appear in the map. + */ + def toMap: Map[String, String] = { +val map = new scala.collection.mutable.HashMap[String, String] +map.put(ColumnStat.KEY_VERSION, "1") +map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString) +map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString) +map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString) +map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString) +min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) } +max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) } +map.toMap } } -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + +object ColumnStat extends Logging { + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" + + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -case class NumericColumnStat[T <: AtomicType](statRow: InternalRow, dataType: T) { - // The indices here must be consistent with `ColumnStatStruct.numericColumnStat`. - val numNulls: Long = statRow.getLong(0) - val max:
[GitHub] spark issue #15480: [SPARK-16845][SQL] `GeneratedClass$SpecificOrdering` gro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15480 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69044/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15480: [SPARK-16845][SQL] `GeneratedClass$SpecificOrdering` gro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15480 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15480: [SPARK-16845][SQL] `GeneratedClass$SpecificOrdering` gro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15480 **[Test build #69044 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69044/consoleFull)** for PR 15480 at commit [`33b5fd8`](https://github.com/apache/spark/commit/33b5fd86b2933ca359cb943ddf21f9bc714d38bf). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails for no...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15987 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69050/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails for no...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15987 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails for no...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15987 **[Test build #69050 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69050/consoleFull)** for PR 15987 at commit [`970a904`](https://github.com/apache/spark/commit/970a904c5efc1ff2089162c0ca0acbef3f2ca9db). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait CommandWithExpression extends LeafNode ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15986 **[Test build #69053 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69053/consoleFull)** for PR 15986 at commit [`e99cc8f`](https://github.com/apache/spark/commit/e99cc8ffad9c47976d5743502852cc66f59452d3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15959 **[Test build #69052 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69052/consoleFull)** for PR 15959 at commit [`07bea22`](https://github.com/apache/spark/commit/07bea22b18f4eaaffccddf7d2d0a70f6ce0be5f2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258401 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". + */ + def toMap: Map[String, String] = Map( +ColumnStat.KEY_VERSION -> "1", +ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, +ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_NULL_COUNT -> nullCount.toString, +ColumnStat.KEY_AVG_LEN -> avgLen.toString, +ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) +: Option[ColumnStat] = { +val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_)
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258367 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". + */ + def toMap: Map[String, String] = Map( +ColumnStat.KEY_VERSION -> "1", +ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, +ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_NULL_COUNT -> nullCount.toString, +ColumnStat.KEY_AVG_LEN -> avgLen.toString, +ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) +: Option[ColumnStat] = { +val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_)
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258326 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". + */ + def toMap: Map[String, String] = Map( +ColumnStat.KEY_VERSION -> "1", +ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, +ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_NULL_COUNT -> nullCount.toString, +ColumnStat.KEY_AVG_LEN -> avgLen.toString, +ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) +: Option[ColumnStat] = { +val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_)
[GitHub] spark issue #15959: [SPARK-18522][SQL] Explicit contract for column stats se...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15959 **[Test build #69051 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69051/consoleFull)** for PR 15959 at commit [`0c07165`](https://github.com/apache/spark/commit/0c0716576fbb4362575be0dff685501ad600c870). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258177 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". + */ + def toMap: Map[String, String] = Map( +ColumnStat.KEY_VERSION -> "1", +ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, +ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_NULL_COUNT -> nullCount.toString, +ColumnStat.KEY_AVG_LEN -> avgLen.toString, +ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) +: Option[ColumnStat] = { +val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_)
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258106 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". --- End diff -- yea good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258088 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -319,7 +319,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils TableIdentifier(parquetTable)) assert(DDLUtils.isDatasourceTable(catalogTable)) -sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src") +// Add a filter to avoid creating too many partitions +sql(s"INSERT INTO TABLE $parquetTable SELECT * FROM src WHERE key < 10") --- End diff -- yea good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89258070 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". + */ + def toMap: Map[String, String] = Map( +ColumnStat.KEY_VERSION -> "1", +ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, +ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_NULL_COUNT -> nullCount.toString, +ColumnStat.KEY_AVG_LEN -> avgLen.toString, +ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) +: Option[ColumnStat] = { +val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_)
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89257869 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". + */ + def toMap: Map[String, String] = Map( +ColumnStat.KEY_VERSION -> "1", +ColumnStat.KEY_DISTINCT_COUNT -> distinctCount.toString, +ColumnStat.KEY_MIN_VALUE -> min.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_MAX_VALUE -> max.map(_.toString).getOrElse(ColumnStat.NULL_STRING), +ColumnStat.KEY_NULL_COUNT -> nullCount.toString, +ColumnStat.KEY_AVG_LEN -> avgLen.toString, +ColumnStat.KEY_MAX_LEN -> maxLen.toString + ) +} + + +object ColumnStat extends Logging { + + /** String representation for null in serialization. */ + private val NULL_STRING: String = "" + + // List of string keys used to serialize ColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" - override def toString: String = { -// use Base64 for encoding -Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes) + /** Returns true iff the we support gathering column statistics on column of the given type. */ + def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false } -} -object ColumnStat { - def apply(numFields: Int, str: String): ColumnStat = { -// use Base64 for decoding -val bytes = Base64.decodeBase64(str) -val unsafeRow = new UnsafeRow(numFields) -unsafeRow.pointTo(bytes, bytes.length) -ColumnStat(unsafeRow) + /** + * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats + * from some external storage. The serialization side is defined in [[ColumnStat.toMap]]. + */ + def fromMap(table: String, field: StructField, map: Map[String, String]) +: Option[ColumnStat] = { +val str2val: (String => Any) = field.dataType match { + case _: IntegralType => _.toLong + case _: DecimalType => new java.math.BigDecimal(_)
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89257822 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (executorIdToTaskCount.contains(execId)) { + if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) --- End diff -- To look at this another way, note that `TaskSchedulerImpl` is only called from three places: - LocalSchedulerBackend, which won't use `TaskState.LOST` - MesosFineGrainedSchedulerBackend, where `TaskState.LOST` means the total loss of an executor that corresponded to a single task (due to fine-grained mode) - CoarseGrainedSchedulerBackend, where this is only called with a state that comes from a `StatusUpdate` message sent by an executor. This task state will never be `TaskState.LOST`. Given all of this, I think that the right course of action here is to update the comments to clarify that `TaskState.LOST` is only relevant to fine-grained Mesos mode and to refactor this block to call ``` taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) ``` after calling `removeExecutor` for the fine-grained task, then skipping the rest of the logic which only applies to local mode or coarse-grained schedulers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89257721 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string + * representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they will be stored as "". --- End diff -- I may miss the discussion, why not just remove the `max`, `min` entries in the map if they are null? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15959#discussion_r89257647 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -58,60 +61,170 @@ case class Statistics( } } + /** - * Statistics for a column. + * Statistics collected for a column. + * + * 1. Supported data types are defined in `ColumnStat.supportsType`. + * 2. The JVM data type stored in min/max is the external data type (used in Row) for the + * corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for + * TimestampType we store java.sql.Timestamp. + * 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs. + * 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms + *(sketches) might have been used, and the data collected can also be stale. + * + * @param distinctCount number of distinct values + * @param min minimum value + * @param max maximum value + * @param nullCount number of nulls + * @param avgLen average length of the values. For fixed-length types, this should be a constant. + * @param maxLen maximum length of the values. For fixed-length types, this should be a constant. */ -case class ColumnStat(statRow: InternalRow) { +case class ColumnStat( +distinctCount: BigInt, +min: Option[Any], +max: Option[Any], +nullCount: BigInt, +avgLen: Long, +maxLen: Long) { - def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = { -NumericColumnStat(statRow, dataType) - } - def forString: StringColumnStat = StringColumnStat(statRow) - def forBinary: BinaryColumnStat = BinaryColumnStat(statRow) - def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow) + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the field (e.g. "ndv" or "min"), and the value is the string --- End diff -- nit: `ndv` -> `distinctCount` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15986 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15986 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69046/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskSetManag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15986 **[Test build #69046 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69046/consoleFull)** for PR 15986 at commit [`69feae3`](https://github.com/apache/spark/commit/69feae3591adc9fe88aff8c190d0d95f14cb0ced). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15974: [SPARK-18537] [Web UI]Add a REST api to spark streaming
Github user ChorPangChan commented on the issue: https://github.com/apache/spark/pull/15974 I stay adding a new package to streaming is a better structure then modify the spark-core. can we make the decision for which implementation to use first --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15985 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15985 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69043/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15976: [SPARK-18403][SQL] Fix unsafe data false sharing ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15976#discussion_r89257213 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala --- @@ -262,7 +262,9 @@ class SortBasedAggregator( // Firstly, update the aggregation buffer with input rows. while (hasNextInput && groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) { -processRow(result.aggregationBuffer, inputIterator.getValue) +// Since `inputIterator.getValue` is an `UnsafeRow` whose underlying buffer will be +// overwritten when `inputIterator` steps forward, we need to do a deep copy here. +processRow(result.aggregationBuffer, inputIterator.getValue.copy()) --- End diff -- So the problem is, during `processRow` we cache the input row somehow? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15985 **[Test build #69043 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69043/consoleFull)** for PR 15985 at commit [`92b22f5`](https://github.com/apache/spark/commit/92b22f5751953b4405b40c4d456392bd481c50e6). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15974: [SPARK-18537] [Web UI]Add a REST api to spark streaming
Github user uncleGen commented on the issue: https://github.com/apache/spark/pull/15974 I think the base solutions are same, expect some other information which I am working to add. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89256962 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala --- @@ -274,4 +274,30 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L assert("executor1" === taskDescriptions3(0).executorId) } + test("if an executor is lost then state for tasks running on that executor is cleaned up") { +sc = new SparkContext("local", "TaskSchedulerImplSuite") +val taskScheduler = new TaskSchedulerImpl(sc) +taskScheduler.initialize(new FakeSchedulerBackend) +// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. +new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} +} + +val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1)) +val attempt1 = FakeTask.createTaskSet(1) + +// submit attempt 1, offer resources, task gets scheduled +taskScheduler.submitTasks(attempt1) +val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten +assert(1 === taskDescriptions.length) + +// mark executor0 as dead +taskScheduler.executorLost("executor0", SlaveLost()) + +// Check that state associated with the lost task attempt is cleaned up: +assert(taskScheduler.taskIdToExecutorId.isEmpty) --- End diff -- I suppose that we should also strengthen the assertions in the existing tests to check that these maps are updated following task successes, but this may be tricky given that the existing tests aren't exercising the `statusUpdate` path. Rather, we may have to test this more end-to-end by asserting that these always become empty once all jobs and tasks are done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15974: [SPARK-18537] [Web UI]Add a REST api to spark streaming
Github user uncleGen commented on the issue: https://github.com/apache/spark/pull/15974 I think there is no need to open another reduplicate PR. Do your mind closing this PR, and let work on #15904 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89256608 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (executorIdToTaskCount.contains(execId)) { + if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) --- End diff -- More archaeology: It looks like `TaskState.LOST` was introduced in Spark 0.6.0 as part of a refactoring to make the cluster scheduler pluggable: https://github.com/apache/spark/commit/e72afdb817bcc8388aeb8b8d31628fd5fd67acf1. That commit is from July 2012. At the time, standalone mode didn't even exist and the schedulers were Mesos and local mode, and only Mesos fine-grained mode was supported. The only way to get a `TaskState.LOST` state was to convert the Mesos task loss state to it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15904: [SPARK-18470][STREAMING][WIP] Provide Spark Streaming Mo...
Github user uncleGen commented on the issue: https://github.com/apache/spark/pull/15904 @ajbozarth Thank you for reminding me, i will take a look at it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89256275 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -89,9 +89,11 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // Number of tasks running on each executor --- End diff -- update this comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15983: [SPARK-18544] [SQL] Append with df.saveAsTable wr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15983#discussion_r89256222 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -373,8 +373,19 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException(s"Table $tableIdent already exists.") case _ => -val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap) -val tableType = if (storage.locationUri.isDefined) { +val existingTable = if (tableExists) { --- End diff -- shall we move this logic in `CreateDataSourceTableAsSelectCommand`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89256137 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (executorIdToTaskCount.contains(execId)) { + if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) --- End diff -- cc @mgummelt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89255724 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -339,7 +341,7 @@ private[spark] class TaskSchedulerImpl( // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) - if (executorIdToTaskCount.contains(execId)) { + if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) removeExecutor(execId, reason.get) --- End diff -- I added some logging in this `TaskState.LOST` branch and it looks like this case isn't hit at all in our existing scheduler tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15974: [SPARK-18537] [Web UI]Add a REST api to spark streaming
Github user ChorPangChan commented on the issue: https://github.com/apache/spark/pull/15974 It require manual merge if I do it against master. should I just PR with conflicts or rebase to master before PR? btw those conflicts are just versions from the pom file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails for no...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/15987 Hi, @hvanhovell . Could you review this PR? This is the first attempt to use `UnresolvedAttribute` and Analyzer rule. There are two debatable issues. - Catalyst Analyzer doesn't know `AlterTableDropPartitions`, so I need to introduce `CommandWithExpression` trait here. - I added a testcase for atomic types, but we need to change `ADD PARTITION`, too. `ADD PARTITION` relates the several more parts. If possible, I want to make them as a separate PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails for no...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15987 **[Test build #69050 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69050/consoleFull)** for PR 15987 at commit [`970a904`](https://github.com/apache/spark/commit/970a904c5efc1ff2089162c0ca0acbef3f2ca9db). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15987: [SPARK-18515][SQL] AlterTableDropPartitions fails...
GitHub user dongjoon-hyun opened a pull request: https://github.com/apache/spark/pull/15987 [SPARK-18515][SQL] AlterTableDropPartitions fails for non-string columns ## What changes were proposed in this pull request? While [SPARK-17732](https://issues.apache.org/jira/browse/SPARK-17732) improved `PARTITION` specification as a expression, it introduce a regression which `AlterTableDropPartitions` fails for non-string partitioning columns. This PR fixes that to use a correct type casting. ```scala scala> sql("create table tbl_x (a int) partitioned by (p int)") scala> sql("alter table tbl_x add partition (p=10)") scala> sql("alter table tbl_x drop partition (p=10)") scala> sql("alter table tbl_x drop partition (p=10)") scala.MatchError: (cast(p#8 as double) = 10.0) (of class org.apache.spark.sql.catalyst.expressions.EqualTo) ``` ## How was this patch tested? Pass the Jenkins tests with new test case. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dongjoon-hyun/spark SPARK-18515 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15987.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15987 commit 970a904c5efc1ff2089162c0ca0acbef3f2ca9db Author: Dongjoon HyunDate: 2016-11-23T01:34:52Z [SPARK-18515][SQL] AlterTableDropPartitions fails for non-string columns --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15982: [SPARK-18546][core] Fix merging shuffle spills when usin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15982 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69040/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15982: [SPARK-18546][core] Fix merging shuffle spills when usin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15982 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15982: [SPARK-18546][core] Fix merging shuffle spills when usin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15982 **[Test build #69040 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69040/consoleFull)** for PR 15982 at commit [`3b879f4`](https://github.com/apache/spark/commit/3b879f4045edc458de723d0c2fb2f489391f999d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15971: [SPARK-18535][UI][YARN] Redact sensitive information fro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15971 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15971: [SPARK-18535][UI][YARN] Redact sensitive information fro...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15971 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69042/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15971: [SPARK-18535][UI][YARN] Redact sensitive information fro...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15971 **[Test build #69042 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69042/consoleFull)** for PR 15971 at commit [`eed33db`](https://github.com/apache/spark/commit/eed33db29d09b4730617fe1502da69a99ea9df42). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15974: [SPARK-18537] [Web UI]Add a REST api to spark streaming
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/15974 This will never be merged into 1.6, so you'll have to send a new PR against master. Please close this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15984: [SPARK-18551] [Web UI] [Core] [WIP] Add functionality to...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15984 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15984: [SPARK-18551] [Web UI] [Core] [WIP] Add functionality to...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/15984 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/69041/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89253541 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl( * of any running tasks, since the loss reason defines whether we'll fail those tasks. --- End diff -- Based on the ["tasks are not re-scheduled while executor loss reason is pending" test](https://github.com/apache/spark/blob/072f4c518cdc57d705beec6bcc3113d9a6740819/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala#L268) in `TaskSchedulerImplSuite`, it looks like the API contract here is that if `executorLost` is called with `LossReasonPending` then it will eventually be called with some other reason. This will cause it to [call](https://github.com/apache/spark/pull/15986/files#diff-d4000438827afe3a185ae75b24987a61R550) `rootPool.executorLost()` , which, in turn, will call `executorLost` for all TaskSetManagers, which will perform their own internal executorId to task id mapping to mark tasks as failed and inform the DAGScheduler. The `TaskSetManager` doesn't call back into the `TaskScheduler` to access any of the data in these mappings so I think it's safe to clean them up immediately at the top of `removeExecutor` rather than putting them behind the `r eason != LossReasonPending` check. Note that it's also not as simple as just putting those behind `reason != LossReasonPending` as a defensive measure because then we'd be changing the contract on when `runningTasksByExecutors()` is updated: previously, it would set a failed executor's running task count to zero as soon as the executor failed, whereas it would do it only after the reason was known should we move this update behind that check. I think that these subtleties / distinctions are only relevant to YARN mode, so I'll loop in @vanzin to comment on them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15984: [SPARK-18551] [Web UI] [Core] [WIP] Add functionality to...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15984 **[Test build #69041 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69041/consoleFull)** for PR 15984 at commit [`d243293`](https://github.com/apache/spark/commit/d243293ede2a6c2345daafba5b5cc6259cacd94f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15877 **[Test build #69049 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69049/consoleFull)** for PR 15877 at commit [`1bfb6fd`](https://github.com/apache/spark/commit/1bfb6fd7810c7db9218bcdb65aed80102406b211). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15877: [SPARK-18429] [SQL] implement a new Aggregate for CountM...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/15877 cc @rxin @hvanhovell --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15985: [SPARK-18545] [SQL] Verify number of hive client RPCs in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/15985 **[Test build #69048 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69048/consoleFull)** for PR 15985 at commit [`e6d835f`](https://github.com/apache/spark/commit/e6d835f6516256ea31be158fca5b8cbfdd7dc7e8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15986: [SPARK-18553][CORE][branch-2.0] Fix leak of TaskS...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15986#discussion_r89252852 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -525,7 +525,12 @@ private[spark] class TaskSchedulerImpl( * of any running tasks, since the loss reason defines whether we'll fail those tasks. --- End diff -- I'm a bit confused about this comment since this seems to suggest that we'll do some sort of per-task cleanup at some later time. If anyone knows which cleanup this is referring to then maybe we should consider not updating `executorIdToRunningTaskIds` at all in here and instead maybe should be performing the `taskIdToExecutorId` and `taskIdToTaskSetManager` updates somewhere else. On the other hand, the only place where we currently remove entries from `taskIdToExecutorId` and `taskIdToTaskSetManager` are in `statusUpdate`, so my hunch is that the eventual cleanup alluded to here isn't happening in standalone mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org