[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203618471
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1386,29 +1418,90 @@ class DAGScheduler(
   )
 }
   }
-  // Mark the map whose fetch failed as broken in the map stage
-  if (mapId != -1) {
-mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
-  }
+}
 
-  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
-  if (bmAddress != null) {
-val hostToUnregisterOutputs = if 
(env.blockManager.externalShuffleServiceEnabled &&
-  unRegisterOutputOnHostOnFetchFailure) {
-  // We had a fetch failure with the external shuffle service, 
so we
-  // assume all shuffle data on the node is bad.
-  Some(bmAddress.host)
-} else {
-  // Unregister shuffle data just for one executor (we don't 
have any
-  // reason to believe shuffle data has been lost for the 
entire host).
-  None
+  case failure: TaskFailedReason if task.isBarrier =>
+// Also handle the task failed reasons here.
+failure match {
+  case Resubmitted =>
+logInfo("Resubmitted " + task + ", so marking it as still 
running")
+stage match {
+  case sms: ShuffleMapStage =>
+sms.pendingPartitions += task.partitionId
+
+  case _ =>
+assert(false, "TaskSetManagers should only send 
Resubmitted task statuses for " +
--- End diff --

`assert(false ...` is weird, please throw an exception directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21814: [SPARK-24858][SQL] Avoid unnecessary parquet foot...

2018-07-19 Thread gengliangwang
GitHub user gengliangwang opened a pull request:

https://github.com/apache/spark/pull/21814

[SPARK-24858][SQL] Avoid unnecessary parquet footer reads

## What changes were proposed in this pull request?

Currently the same Parquet footer is read twice in the function 
`buildReaderWithPartitionValues` of ParquetFileFormat if filter push down is 
enabled.

Fix it with simple changes.
## How was this patch tested?

Unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gengliangwang/spark parquetFooter

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21814.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21814


commit 5667cc57022d840aecb6c7d0c967e2a3448a4928
Author: Gengliang Wang 
Date:   2018-07-19T06:43:45Z

Avoid unnecessary parquet footer reading




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21803: [SPARK-24849][SQL] Converting a value of StructType to a...

2018-07-19 Thread MaxGekk
Github user MaxGekk commented on the issue:

https://github.com/apache/spark/pull/21803
  
@hvanhovell Could you look at the PR please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203618106
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,48 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+mapOutputTracker.unregisterAllMapOutput(shuffleId)
+  } else if (mapId != -1) {
+// Mark the map whose fetch failed as broken in the map stage
+mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
+  }
+
+  if (failedStage.rdd.isBarrier()) {
+failedStage match {
+  case mapStage: ShuffleMapStage =>
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+
+  case resultStage: ResultStage =>
+// Mark all the partitions of the result stage to be not 
finished, to ensure retry
+// all the tasks on resubmitted stage attempt.
+
resultStage.activeJob.map(_.markAllPartitionsAsUnfinished())
+}
+  }
+
+  // TODO: mark the executor as failed only if there were lots of 
fetch failures on it
+  if (bmAddress != null) {
--- End diff --

why move this before the `if (shouldAbortStage) { ...`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21732: [SPARK-24762][SQL] Aggregator should be able to use Opti...

2018-07-19 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/21732
  
At the end of encoder creation? You mean at the end of calling 
`ExpressionEncoder.apply()`? But it is used both for top-level encoder e.g., 
`Dataset[Option[Product]]` and  non top-level encoder e.g., `Aggregator`'s 
encoder. If we flatten it, doesn't it mean for top-level, it is encoded as a 
row, not a struct column?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Bug fix for local:/ path in SparkCon...

2018-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21533
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Bug fix for local:/ path in SparkCon...

2018-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21533
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93259/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203617306
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1349,6 +1339,48 @@ class DAGScheduler(
   s"longer running")
   }
 
+  if (mapStage.rdd.isBarrier()) {
+// Mark all the map as broken in the map stage, to ensure 
retry all the tasks on
+// resubmitted stage attempt.
+mapOutputTracker.unregisterAllMapOutput(shuffleId)
+  } else if (mapId != -1) {
+// Mark the map whose fetch failed as broken in the map stage
+mapOutputTracker.unregisterMapOutput(shuffleId, mapId, 
bmAddress)
+  }
+
+  if (failedStage.rdd.isBarrier()) {
+failedStage match {
+  case mapStage: ShuffleMapStage =>
--- End diff --

please pick a different name. `mapStage` is already used before..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21533: [SPARK-24195][Core] Bug fix for local:/ path in SparkCon...

2018-07-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21533
  
**[Test build #93259 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93259/testReport)**
 for PR 21533 at commit 
[`eb46ccf`](https://github.com/apache/spark/commit/eb46ccfec084c2439a26eee38015381f091fe164).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203616623
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1311,17 +1312,6 @@ class DAGScheduler(
 }
 }
 
-  case Resubmitted =>
--- End diff --

why move the handling of `Resubmitted` after `FetchFailure`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203616384
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala ---
@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
   val finished = Array.fill[Boolean](numPartitions)(false)
 
   var numFinished = 0
+
+  // Mark all the partitions of the stage to be not finished.
+  def markAllPartitionsAsUnfinished(): Unit = {
+(0 until numPartitions).map(finished.update(_, false))
--- End diff --

is `reset` a better name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203616328
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala ---
@@ -60,4 +60,10 @@ private[spark] class ActiveJob(
   val finished = Array.fill[Boolean](numPartitions)(false)
 
   var numFinished = 0
+
+  // Mark all the partitions of the stage to be not finished.
+  def markAllPartitionsAsUnfinished(): Unit = {
+(0 until numPartitions).map(finished.update(_, false))
--- End diff --

`map` -> `foreach`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203615271
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  def isBarrier(): Boolean = isBarrier_
+
+  @transient private lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
--- End diff --

why do we need a lazy val and a def? can we merge them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203615062
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1839,6 +1844,18 @@ abstract class RDD[T: ClassTag](
   def toJavaRDD() : JavaRDD[T] = {
 new JavaRDD(this)(elementClassTag)
   }
+
+  /**
+   * Whether the RDD is in a barrier stage. Spark must launch all the 
tasks at the same time for a
+   * barrier stage.
+   *
+   * An RDD is in a barrier stage, if at least one of its parent RDD(s), 
or itself, are mapped from
+   * a RDDBarrier. This function always returns false for a 
[[ShuffledRDD]], since a
+   * [[ShuffledRDD]] indicates start of a new stage.
+   */
+  def isBarrier(): Boolean = isBarrier_
--- End diff --

does this need to be public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21758: [SPARK-24795][CORE] Implement barrier execution m...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21758#discussion_r203614509
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskInfo.scala ---
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Carries all task infos of a barrier task.
+ */
+class BarrierTaskInfo(val address: String)
--- End diff --

we need param doc, to say that address is IP v4 address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21802: [SPARK-23928][SQL] Add shuffle collection function.

2018-07-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21802
  
**[Test build #93261 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93261/testReport)**
 for PR 21802 at commit 
[`9081e2f`](https://github.com/apache/spark/commit/9081e2f0b0371630aa315842f14931ee490ff461).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...

2018-07-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21608
  
**[Test build #93262 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93262/testReport)**
 for PR 21608 at commit 
[`107f4c6`](https://github.com/apache/spark/commit/107f4c675978628bf0effc08924a5f7d397f3719).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21802: [SPARK-23928][SQL] Add shuffle collection function.

2018-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21802
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1112/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21789: [SPARK-24829][SQL]In Spark Thrift Server, CAST AS...

2018-07-19 Thread zuotingbing
Github user zuotingbing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21789#discussion_r203613567
  
--- Diff: 
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 ---
@@ -766,6 +774,14 @@ class HiveThriftHttpServerSuite extends 
HiveThriftJdbcTest {
   assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion)
 }
   }
+
+  test("Checks cast as float") {
--- End diff --

HiveThriftJdbcTest is an abstract class,there are two classess 
HiveThriftBinaryServerSuite & class HiveThriftBinaryServerSuite extends from 
HiveThriftJdbcTest.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21802: [SPARK-23928][SQL] Add shuffle collection function.

2018-07-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21802
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21732: [SPARK-24762][SQL] Aggregator should be able to use Opti...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21732
  
> Non top-level and top-level encoders for Option[Product] have a little 
difference.

Can we treat them the same but at the end of encoder creation, we flatten 
the `Option[Product]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21782: [SPARK-24816][SQL] SQL interface support repartit...

2018-07-19 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/21782#discussion_r203613170
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 ---
@@ -394,6 +394,41 @@ class FilterPushdownBenchmark extends SparkFunSuite 
with BenchmarkBeforeAndAfter
   }
 }
   }
+
+  ignore("Pushdown benchmark for RANGE PARTITION BY/DISTRIBUTE BY") {
--- End diff --

The range partition is better sorted, so the RowGroups can be skipped more 
when filter. This is an example:
```scala
test("SPARK-24816") {
  withTable("tbl") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "4") {
  spark.range(100).createTempView("tbl")
  spark.sql("select * from tbl DISTRIBUTE BY id SORT BY 
id").write.parquet("/tmp/spark/parquet/hash")
  spark.sql("select * from tbl RANGE PARTITION BY id SORT BY 
id").write.parquet("/tmp/spark/parquet/range")
}
  }
}
```
Column statistics info after `HashPartitioning`:

File | id column statistics
--- | ---
part-0 | min: 2, max: 93
part-1 | min: 0, max: 99
part-2 | min: 14, max: 94
part-3 | min: 3, max: 98

Column statistics info after `RangePartitioning`:

file | id column statistics
--- | ---
part-0 | min: 0, max: 24
part-1 | min: 25, max: 49
part-2 | min: 50, max: 74
part-3 | min: 75, max: 99


# File meta  after `HashPartitioning`:

![image](https://user-images.githubusercontent.com/5399861/42924696-f40c6346-8b5d-11e8-8a78-6f7e49372577.png)

# File meta after `RangePartitioning`:

![image](https://user-images.githubusercontent.com/5399861/42924714-05c40986-8b5e-11e8-8ce1-e9f29aec8db8.png)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-07-19 Thread Achuth17
Github user Achuth17 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r203613041
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -148,6 +148,19 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("verify table size calculation is accurate") {
--- End diff --

@maropu, I have fixed the test to verify if the calculation is being done 
in parallel. Can you review the change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21789: [SPARK-24829][SQL]In Spark Thrift Server, CAST AS...

2018-07-19 Thread zuotingbing
Github user zuotingbing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21789#discussion_r203610279
  
--- Diff: 
sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/Column.java ---
@@ -349,7 +349,7 @@ public void addValue(Type type, Object field) {
 break;
   case FLOAT_TYPE:
 nulls.set(size, field == null);
-doubleVars()[size] = field == null ? 0 : 
((Float)field).doubleValue();
+doubleVars()[size] = field == null ? 0 : new 
Double(field.toString());
--- End diff --

sorry i am not sure what is your meaning.   "(Double)field" ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6