[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203618471 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1386,29 +1418,90 @@ class DAGScheduler( ) } } - // Mark the map whose fetch failed as broken in the map stage - if (mapId != -1) { -mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - } +} - // TODO: mark the executor as failed only if there were lots of fetch failures on it - if (bmAddress != null) { -val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) -} else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + case failure: TaskFailedReason if task.isBarrier => +// Also handle the task failed reasons here. +failure match { + case Resubmitted => +logInfo("Resubmitted " + task + ", so marking it as still running") +stage match { + case sms: ShuffleMapStage => +sms.pendingPartitions += task.partitionId + + case _ => +assert(false, "TaskSetManagers should only send Resubmitted task statuses for " + --- End diff -- `assert(false ...` is weird, please throw an exception directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21814: [SPARK-24858][SQL] Avoid unnecessary parquet foot...
GitHub user gengliangwang opened a pull request: https://github.com/apache/spark/pull/21814 [SPARK-24858][SQL] Avoid unnecessary parquet footer reads ## What changes were proposed in this pull request? Currently the same Parquet footer is read twice in the function `buildReaderWithPartitionValues` of ParquetFileFormat if filter push down is enabled. Fix it with simple changes. ## How was this patch tested? Unit test You can merge this pull request into a Git repository by running: $ git pull https://github.com/gengliangwang/spark parquetFooter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21814.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21814 commit 5667cc57022d840aecb6c7d0c967e2a3448a4928 Author: Gengliang Wang Date: 2018-07-19T06:43:45Z Avoid unnecessary parquet footer reading --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21803: [SPARK-24849][SQL] Converting a value of StructType to a...
Github user MaxGekk commented on the issue: https://github.com/apache/spark/pull/21803 @hvanhovell Could you look at the PR please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203618106 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. +mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { +// Mark the map whose fetch failed as broken in the map stage +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { +failedStage match { + case mapStage: ShuffleMapStage => +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + + case resultStage: ResultStage => +// Mark all the partitions of the result stage to be not finished, to ensure retry +// all the tasks on resubmitted stage attempt. + resultStage.activeJob.map(_.markAllPartitionsAsUnfinished()) +} + } + + // TODO: mark the executor as failed only if there were lots of fetch failures on it + if (bmAddress != null) { --- End diff -- why move this before the `if (shouldAbortStage) { ...`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21732: [SPARK-24762][SQL] Aggregator should be able to use Opti...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21732 At the end of encoder creation? You mean at the end of calling `ExpressionEncoder.apply()`? But it is used both for top-level encoder e.g., `Dataset[Option[Product]]` and non top-level encoder e.g., `Aggregator`'s encoder. If we flatten it, doesn't it mean for top-level, it is encoded as a row, not a struct column? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21533: [SPARK-24195][Core] Bug fix for local:/ path in SparkCon...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21533 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21533: [SPARK-24195][Core] Bug fix for local:/ path in SparkCon...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21533 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93259/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203617306 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1349,6 +1339,48 @@ class DAGScheduler( s"longer running") } + if (mapStage.rdd.isBarrier()) { +// Mark all the map as broken in the map stage, to ensure retry all the tasks on +// resubmitted stage attempt. +mapOutputTracker.unregisterAllMapOutput(shuffleId) + } else if (mapId != -1) { +// Mark the map whose fetch failed as broken in the map stage +mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) + } + + if (failedStage.rdd.isBarrier()) { +failedStage match { + case mapStage: ShuffleMapStage => --- End diff -- please pick a different name. `mapStage` is already used before.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21533: [SPARK-24195][Core] Bug fix for local:/ path in SparkCon...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21533 **[Test build #93259 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93259/testReport)** for PR 21533 at commit [`eb46ccf`](https://github.com/apache/spark/commit/eb46ccfec084c2439a26eee38015381f091fe164). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203616623 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1311,17 +1312,6 @@ class DAGScheduler( } } - case Resubmitted => --- End diff -- why move the handling of `Resubmitted` after `FetchFailure`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203616384 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala --- @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. + def markAllPartitionsAsUnfinished(): Unit = { +(0 until numPartitions).map(finished.update(_, false)) --- End diff -- is `reset` a better name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203616328 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala --- @@ -60,4 +60,10 @@ private[spark] class ActiveJob( val finished = Array.fill[Boolean](numPartitions)(false) var numFinished = 0 + + // Mark all the partitions of the stage to be not finished. + def markAllPartitionsAsUnfinished(): Unit = { +(0 until numPartitions).map(finished.update(_, false)) --- End diff -- `map` -> `foreach` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203615271 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = isBarrier_ + + @transient private lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) --- End diff -- why do we need a lazy val and a def? can we merge them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203615062 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + /** + * Whether the RDD is in a barrier stage. Spark must launch all the tasks at the same time for a + * barrier stage. + * + * An RDD is in a barrier stage, if at least one of its parent RDD(s), or itself, are mapped from + * a RDDBarrier. This function always returns false for a [[ShuffledRDD]], since a + * [[ShuffledRDD]] indicates start of a new stage. + */ + def isBarrier(): Boolean = isBarrier_ --- End diff -- does this need to be public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21758#discussion_r203614509 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala --- @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Carries all task infos of a barrier task. + */ +class BarrierTaskInfo(val address: String) --- End diff -- we need param doc, to say that address is IP v4 address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21802: [SPARK-23928][SQL] Add shuffle collection function.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21802 **[Test build #93261 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93261/testReport)** for PR 21802 at commit [`9081e2f`](https://github.com/apache/spark/commit/9081e2f0b0371630aa315842f14931ee490ff461). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21608 **[Test build #93262 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93262/testReport)** for PR 21608 at commit [`107f4c6`](https://github.com/apache/spark/commit/107f4c675978628bf0effc08924a5f7d397f3719). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21802: [SPARK-23928][SQL] Add shuffle collection function.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21802 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1112/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21789: [SPARK-24829][SQL]In Spark Thrift Server, CAST AS...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/21789#discussion_r203613567 --- Diff: sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala --- @@ -766,6 +774,14 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion) } } + + test("Checks cast as float") { --- End diff -- HiveThriftJdbcTest is an abstract classï¼there are two classess HiveThriftBinaryServerSuite & class HiveThriftBinaryServerSuite extends from HiveThriftJdbcTest. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21802: [SPARK-23928][SQL] Add shuffle collection function.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21802 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21732: [SPARK-24762][SQL] Aggregator should be able to use Opti...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21732 > Non top-level and top-level encoders for Option[Product] have a little difference. Can we treat them the same but at the end of encoder creation, we flatten the `Option[Product]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21782: [SPARK-24816][SQL] SQL interface support repartit...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21782#discussion_r203613170 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala --- @@ -394,6 +394,41 @@ class FilterPushdownBenchmark extends SparkFunSuite with BenchmarkBeforeAndAfter } } } + + ignore("Pushdown benchmark for RANGE PARTITION BY/DISTRIBUTE BY") { --- End diff -- The range partition is better sorted, so the RowGroups can be skipped more when filter. This is an example: ```scala test("SPARK-24816") { withTable("tbl") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "4") { spark.range(100).createTempView("tbl") spark.sql("select * from tbl DISTRIBUTE BY id SORT BY id").write.parquet("/tmp/spark/parquet/hash") spark.sql("select * from tbl RANGE PARTITION BY id SORT BY id").write.parquet("/tmp/spark/parquet/range") } } } ``` Column statistics info after `HashPartitioning`: File | id column statistics --- | --- part-0 | min: 2, max: 93 part-1 | min: 0, max: 99 part-2 | min: 14, max: 94 part-3 | min: 3, max: 98 Column statistics info after `RangePartitioning`: file | id column statistics --- | --- part-0 | min: 0, max: 24 part-1 | min: 25, max: 49 part-2 | min: 50, max: 74 part-3 | min: 75, max: 99 # File meta after `HashPartitioning`: ![image](https://user-images.githubusercontent.com/5399861/42924696-f40c6346-8b5d-11e8-8a78-6f7e49372577.png) # File meta after `RangePartitioning`: ![image](https://user-images.githubusercontent.com/5399861/42924714-05c40986-8b5e-11e8-8ce1-e9f29aec8db8.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user Achuth17 commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r203613041 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala --- @@ -148,6 +148,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("verify table size calculation is accurate") { --- End diff -- @maropu, I have fixed the test to verify if the calculation is being done in parallel. Can you review the change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21789: [SPARK-24829][SQL]In Spark Thrift Server, CAST AS...
Github user zuotingbing commented on a diff in the pull request: https://github.com/apache/spark/pull/21789#discussion_r203610279 --- Diff: sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/Column.java --- @@ -349,7 +349,7 @@ public void addValue(Type type, Object field) { break; case FLOAT_TYPE: nulls.set(size, field == null); -doubleVars()[size] = field == null ? 0 : ((Float)field).doubleValue(); +doubleVars()[size] = field == null ? 0 : new Double(field.toString()); --- End diff -- sorry i am not sure what is your meaning. "(Double)field" ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org