[GitHub] spark issue #23228: [MINOR][DOC]The condition description of serialized shuf...

2018-12-09 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/23228
  
Please update the title `[MINOR][DOC] Update the condition description of 
serialized shuffle`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...

2018-12-05 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/23222
  
Shall we add a SQL tag to the title?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRad...

2018-11-15 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/23046
  
I searched the code and didn't find similar issues, so this is the only one 
shall be fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22912: [SPARK-25901][CORE] Use only one thread in BarrierTaskCo...

2018-11-03 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22912
  
Thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...

2018-10-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22723#discussion_r229717747
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat
* Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API,
* which is set through setMaxSplitSize
*/
-  def setMinPartitions(context: JobContext, minPartitions: Int) {
+  def setMinPartitions(sc: SparkContext, context: JobContext, 
minPartitions: Int) {
--- End diff --

Please update the above comment to explain the new behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...

2018-10-31 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22723#discussion_r229717581
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---
@@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat
* Allow minPartitions set by end-user in order to keep compatibility 
with old Hadoop API,
* which is set through setMaxSplitSize
*/
-  def setMinPartitions(context: JobContext, minPartitions: Int) {
+  def setMinPartitions(sc: SparkContext, context: JobContext, 
minPartitions: Int) {
 val files = listStatus(context).asScala
 val totalLen = files.map(file => if (file.isDirectory) 0L else 
file.getLen).sum
-val maxSplitSize = Math.ceil(totalLen * 1.0 /
-  (if (minPartitions == 0) 1 else minPartitions)).toLong
+val minPartNum = Math.max(sc.defaultParallelism, minPartitions)
--- End diff --

This is potentially a behavior change. cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22849: [SPARK-25852][Core] we should filter the workOffers with...

2018-10-30 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22849
  
It may happen that a busy executor is marked as lost and later it 
re-register to the driver, in that case currently we call `makeOffers()` and 
that will add the executor into `TaskSchedulerImpl.hostToExecutors`. This is 
bad implementation here since it shall not have depend on the `makeOffers()` 
function to update a unrelated protected val, but that's what we have for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22849: [SPARK-25852][Core] we should filter the workOffers with...

2018-10-30 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22849
  
What do you mean by "better performance" ? If that means we can spend less 
time on `TaskSchedulerImpl.resourceOffers()` then I agree it's true, but AFAIK 
it's never reported this can be a bottleneck of the whole cluster, so maybe the 
perf gain is trivial here. If you expect better task distribution over existing 
executors then I don't see any case can be improved by this proposed change. 
Please correct me if I'm wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22853: [SPARK-25845][SQL] Fix MatchError for calendar in...

2018-10-28 Thread jiangxb1987
Github user jiangxb1987 closed the pull request at:

https://github.com/apache/spark/pull/22853


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22853: [SPARK-25845][SQL] Fix MatchError for calendar interval ...

2018-10-28 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22853
  
Merging to master, I can open another PR against 2.4 if required in the 
future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22853: [SPARK-25845][SQL] Fix MatchError for calendar interval ...

2018-10-27 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22853
  
Also cc @gatorsmile @cloud-fan @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22853: [SPARK-25845][SQL] Fix MatchError for calendar in...

2018-10-26 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22853

[SPARK-25845][SQL] Fix MatchError for calendar interval type in range frame 
left boundary

## What changes were proposed in this pull request?

WindowSpecDefinition checks start < last, but CalendarIntervalType is not 
comparable, so it would throw the following exception at runtime:

```
 scala.MatchError: CalendarIntervalType (of class 
org.apache.spark.sql.types.CalendarIntervalType$)  at 
 
org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:58)
 at 
 
org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering$lzycompute(predicates.scala:592)
 at 
 
org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering(predicates.scala:592)
 at 
 
org.apache.spark.sql.catalyst.expressions.GreaterThan.nullSafeEval(predicates.scala:797)
 at 
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:496)
 at 
org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.isGreaterThan(windowExpressions.scala:245)
 at 
 
org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.checkInputDataTypes(windowExpressions.scala:216)
 at 
 
org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:171)
 at 
 
org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:171)
 at 
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at 
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at 
 
scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
 at 
scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43) 
at scala.collection.mutable.ArrayBuffer.forall(ArrayBuffer.scala:48) at 
 
org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:183)
 at 
 
org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved$lzycompute(windowExpressions.scala:48)
 at 
 
org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved(windowExpressions.scala:48)
 at 
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at 
 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183)
 at 
 
scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83)   
 
```

We fix the issue by only perform the check on boundary expressions that are 
AtomicType.

## How was this patch tested?

Add new test case in `DataFrameWindowFramesSuite`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark windowBoundary

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22853


commit 9d2a1b27caefb6b61c767d7971782b9a74e5d199
Author: Xingbo Jiang 
Date:   2018-10-26T15:41:32Z

fix CalendarIntervalType window boundary failure




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22813: [SPARK-25818][CORE] WorkDirCleanup should only remove th...

2018-10-24 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22813
  
IIUC it's not expected to share the SPARK_WORK_DIR with any other usage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r227459990
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try { // cancelTasks will fail if a SchedulerBackend 
does not implement killTask
+  logInfo(
+s"Job ${job.jobId} is finished. Killing 
speculative tasks for this job")
+  // ResultStage is only used by this job. It's safe 
to kill speculative or
+  // zombie tasks in this stage.
+  taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
--- End diff --

IIRC `cancelTasks()` will fail the stage (maybe it's okay here coz the 
stage has been marked completed), if we just want to kill speculative/zombie 
tasks then maybe we shall call `killAllTaskAttempts()` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...

2018-10-15 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22674
  
LGTM, do you have any other concerns @hvanhovell @brkyvz @dongjoon-hyun ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22677: [SPARK-25683][Core] Make AsyncEventQueue.lastReportTimes...

2018-10-14 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22677
  
Sounds good!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22699: [SPARK-25711][Core] Allow start-history-server.sh to sho...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22699
  
Let's also update the title to include the deprecation changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22699#discussion_r224508691
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
 ---
@@ -34,26 +34,25 @@ private[history] class HistoryServerArguments(conf: 
SparkConf, args: Array[Strin
 
   @tailrec
   private def parse(args: List[String]): Unit = {
-if (args.length == 1) {
-  setLogDirectory(args.head)
-} else {
-  args match {
-case ("--dir" | "-d") :: value :: tail =>
-  setLogDirectory(value)
-  parse(tail)
-
-case ("--help" | "-h") :: tail =>
-  printUsageAndExit(0)
-
-case ("--properties-file") :: value :: tail =>
-  propertiesFile = value
-  parse(tail)
-
-case Nil =>
-
-case _ =>
-  printUsageAndExit(1)
-  }
+args match {
+  case ("--dir" | "-d") :: value :: tail =>
+setLogDirectory(value)
+parse(tail)
+
+  case ("--help" | "-h") :: tail =>
+printUsageAndExit(0)
+
+  case ("--properties-file") :: value :: tail =>
+propertiesFile = value
+parse(tail)
+
+  case dir :: Nil =>
--- End diff --

sounds good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22699#discussion_r224508223
  
--- Diff: sbin/start-history-server.sh ---
@@ -28,7 +28,22 @@ if [ -z "${SPARK_HOME}" ]; then
   export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
 fi
 
+# NOTE: This exact class name is matched downstream by SparkSubmit.
+# Any changes need to be reflected there.
+CLASS="org.apache.spark.deploy.history.HistoryServer"
+
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+  echo "Usage: ./sbin/start-history-server.sh [options]"
--- End diff --

Well, I also saw similar code in `start-thriftserver.sh`, it uses 
`usage()`. Both are fine to me, just head up to make sure we've taken that into 
consideration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22699#discussion_r224507524
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
 ---
@@ -34,26 +34,25 @@ private[history] class HistoryServerArguments(conf: 
SparkConf, args: Array[Strin
 
   @tailrec
   private def parse(args: List[String]): Unit = {
-if (args.length == 1) {
-  setLogDirectory(args.head)
-} else {
-  args match {
-case ("--dir" | "-d") :: value :: tail =>
-  setLogDirectory(value)
-  parse(tail)
-
-case ("--help" | "-h") :: tail =>
-  printUsageAndExit(0)
-
-case ("--properties-file") :: value :: tail =>
-  propertiesFile = value
-  parse(tail)
-
-case Nil =>
-
-case _ =>
-  printUsageAndExit(1)
-  }
+args match {
+  case ("--dir" | "-d") :: value :: tail =>
+setLogDirectory(value)
+parse(tail)
+
+  case ("--help" | "-h") :: tail =>
+printUsageAndExit(0)
+
+  case ("--properties-file") :: value :: tail =>
+propertiesFile = value
+parse(tail)
+
+  case dir :: Nil =>
--- End diff --

I'm not against the change, but we shall mention it in the PR desc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22699#discussion_r224504103
  
--- Diff: sbin/start-history-server.sh ---
@@ -28,7 +28,22 @@ if [ -z "${SPARK_HOME}" ]; then
   export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
 fi
 
+# NOTE: This exact class name is matched downstream by SparkSubmit.
+# Any changes need to be reflected there.
+CLASS="org.apache.spark.deploy.history.HistoryServer"
+
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+  echo "Usage: ./sbin/start-history-server.sh [options]"
--- End diff --

nit: why not have a separated `usage()` function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22699#discussion_r224504246
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
 ---
@@ -34,26 +34,25 @@ private[history] class HistoryServerArguments(conf: 
SparkConf, args: Array[Strin
 
   @tailrec
   private def parse(args: List[String]): Unit = {
-if (args.length == 1) {
-  setLogDirectory(args.head)
-} else {
-  args match {
-case ("--dir" | "-d") :: value :: tail =>
-  setLogDirectory(value)
-  parse(tail)
-
-case ("--help" | "-h") :: tail =>
-  printUsageAndExit(0)
-
-case ("--properties-file") :: value :: tail =>
-  propertiesFile = value
-  parse(tail)
-
-case Nil =>
-
-case _ =>
-  printUsageAndExit(1)
-  }
+args match {
+  case ("--dir" | "-d") :: value :: tail =>
+setLogDirectory(value)
+parse(tail)
+
+  case ("--help" | "-h") :: tail =>
+printUsageAndExit(0)
+
+  case ("--properties-file") :: value :: tail =>
+propertiesFile = value
+parse(tail)
+
+  case dir :: Nil =>
--- End diff --

IIUC this is not related to the PR description?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22165
  
Actually my original thinking was like this:
```
  val state = new ContextBarrierState(barrierId, numTasks)
  val requester = mockRequester()
  val request = forgeRequest(numTasks, stageId, stageAttemptId, 
taskAttemptId, barrierEpoch)
  state.handleRequest(requester, request)
  // Verify states
  ...
  // Verify cleanup
  ...
```
So you don't have to launch a SparkContext for the test. Could you please 
check whether this is feasible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22677: [SPARK-25683][Core] Make AsyncEventQueue.lastReportTimes...

2018-10-11 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22677
  
Though it looks a little strange, the log content is actually right, I 
don't think we want to make the last report timestamp to current time (that can 
confuse users what happened before that timestamp). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-09 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r223729445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
@@ -75,95 +76,69 @@ trait QueryExecutionListener {
  */
 @Experimental
 @InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
+class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
--- End diff --

nit: we shall add param comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...

2018-10-09 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22674
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22325: [SPARK-25318]. Add exception handling when wrapping the ...

2018-09-26 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22325
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r220589416
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -187,6 +191,12 @@ private[spark] class BarrierCoordinator(
   requesters.clear()
   cancelTimerTask()
 }
+
+// Check for clearing internal data, visible for test only.
+private[spark] def cleanCheck(): Boolean = requesters.isEmpty && 
timerTask == null
+
+// Get currently barrier epoch, visible for test only.
+private[spark] def getBarrierEpoch(): Int = barrierEpoch
--- End diff --

Why not just make `barrierEpoch` visible for testing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r220590215
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -187,6 +191,12 @@ private[spark] class BarrierCoordinator(
   requesters.clear()
   cancelTimerTask()
 }
+
+// Check for clearing internal data, visible for test only.
+private[spark] def cleanCheck(): Boolean = requesters.isEmpty && 
timerTask == null
--- End diff --

nit: `cleanCheck()` -> `isInternalStateClear()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...

2018-09-26 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22165#discussion_r220591706
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark._
+import org.apache.spark.rpc.RpcTimeout
+
+class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext 
with Eventually {
+
+  /**
+   * Get the current ContextBarrierState from barrierCoordinator.states by 
ContextBarrierId.
+   */
+  private def getBarrierState(
+  stageId: Int,
+  stageAttemptId: Int,
+  barrierCoordinator: BarrierCoordinator) = {
+val barrierId = ContextBarrierId(stageId, stageAttemptId)
+barrierCoordinator.states.get(barrierId)
+  }
+
+  test("normal test for single task") {
+sc = new SparkContext("local", "test")
+val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, 
sc.env.rpcEnv)
+val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", 
barrierCoordinator)
+val stageId = 0
+val stageAttemptNumber = 0
+rpcEndpointRef.askSync[Unit](
--- End diff --

We are still relying on the RPC framework, can we get rid of this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22458: [SPARK-25459] Add viewOriginalText back to Catalo...

2018-09-25 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22458#discussion_r220410022
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -2348,4 +2348,17 @@ class HiveDDLSuite
   }
 }
   }
+
+  test("desc formatted table should also show viewOriginalText for views") 
{
+withView("v1") {
+  sql("CREATE VIEW v1 AS SELECT 1 AS value")
+  assert(sql("DESC FORMATTED v1").collect().containsSlice(
+Seq(
+  Row("Type", "VIEW", ""),
+  Row("View Text", "SELECT 1 AS value", ""),
+  Row("View Original Text:", "SELECT 1 AS value", "")
--- End diff --

@zheyuan28 This is intended, you shall create a view using previous 
versions of Spark, or create a view using Hive directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22526: [SPARK-25502][CORE][WEBUI]Empty Page when page nu...

2018-09-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22526#discussion_r219891354
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---
@@ -685,7 +685,7 @@ private[ui] class TaskDataSource(
 
   private var _tasksToShow: Seq[TaskData] = null
 
-  override def dataSize: Int = taskCount(stage)
+  override def dataSize: Int = store.taskCount(stage.stageId, 
stage.attemptId).toInt
--- End diff --

nit: after this change, the function `taskCount()` is only referenced by 
`totalTasks`, we can inline that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22458: [SPARK-25459] Add viewOriginalText back to Catalo...

2018-09-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22458#discussion_r219370221
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -467,9 +467,9 @@ private[hive] class HiveClientImpl(
 properties = filteredProperties,
 stats = readHiveStats(properties),
 comment = comment,
-// In older versions of Spark(before 2.2.0), we expand the view 
original text and store
-// that into `viewExpandedText`, and that should be used in view 
resolution. So we get
-// `viewExpandedText` instead of `viewOriginalText` for viewText 
here.
--- End diff --

This comment is for `viewText`, please rephrase and keep it, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22325: [SPARK-25318]. Add exception handling when wrappi...

2018-09-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22325#discussion_r218873184
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator(
   throwFetchFailedException(blockId, address, e)
   }
 
-  input = streamWrapper(blockId, in)
-  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
-  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
-  if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
-val originalInput = input
-val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
-try {
+  try {
+input = streamWrapper(blockId, in)
+// Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+// block is small (the decompressed block is smaller than 
maxBytesInFlight)
+if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight 
/ 3) {
+  val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
   // Decompress the whole block at once to detect any 
corruption, which could increase
   // the memory usage tne potential increase the chance of OOM.
   // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
   Utils.copyStream(input, out)
   out.close()
   input = out.toChunkedByteBuffer.toInputStream(dispose = true)
--- End diff --

I'm not the original author of that, but I think so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r218861435
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -136,6 +136,26 @@ private[spark] class Executor(
   // for fetching remote cached RDD blocks, so need to make sure it uses 
the right classloader too.
   env.serializerManager.setDefaultClassLoader(replClassLoader)
 
+  private val executorPlugins: Seq[ExecutorPlugin] = {
+val pluginNames = conf.get(EXECUTOR_PLUGINS)
+if (pluginNames.nonEmpty) {
+  logDebug(s"Initializing the following plugins: 
${pluginNames.mkString(", ")}")
+
+  // Plugins need to load using a class loader that includes the 
executor's user classpath
+  val pluginList: Seq[ExecutorPlugin] =
+Utils.withContextClassLoader(replClassLoader) {
+  val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], 
pluginNames, conf)
+  plugins.foreach(_.init())
--- End diff --

nit: Might be good to log whether each `plugin.init()` succeeded.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API

2018-09-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22192#discussion_r218865220
  
--- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ExecutorPluginSuite {
+  private static final String EXECUTOR_PLUGIN_CONF_NAME = 
"spark.executor.plugins";
+  private static final String testBadPluginName = 
TestBadShutdownPlugin.class.getName();
+  private static final String testPluginName = 
TestExecutorPlugin.class.getName();
+
+  // Static value modified by testing plugin to ensure plugin loaded 
correctly.
+  public static int numSuccessfulPlugins = 0;
+
+  // Static value modified by testing plugin to verify plugins shut down 
properly.
+  public static int numSuccessfulTerminations = 0;
+
+  private JavaSparkContext sc;
+
+  @Before
+  public void setUp() {
+sc = null;
+numSuccessfulPlugins = 0;
+numSuccessfulTerminations = 0;
+  }
+
+  @After
+  public void tearDown() {
+if (sc != null) {
+  sc.stop();
+  sc = null;
+}
+  }
+
+  private SparkConf initializeSparkConf(String pluginNames) {
+return new SparkConf()
+.setMaster("local")
+.setAppName("test")
+.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames);
+  }
+
+  @Test
+  public void testPluginClassDoesNotExist() {
+SparkConf conf = initializeSparkConf("nonexistant.plugin");
+try {
+  sc = new JavaSparkContext(conf);
+  fail("No exception thrown for nonexistant plugin");
+} catch (Exception e) {
+  // We cannot catch ClassNotFoundException directly because Java 
doesn't think it'll be thrown
+  
assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException"));
+}
+  }
+
+  @Test
+  public void testAddPlugin() throws InterruptedException {
+// Load the sample TestExecutorPlugin, which will change the value of 
numSuccessfulPlugins
+SparkConf conf = initializeSparkConf(testPluginName);
+sc = new JavaSparkContext(conf);
+assertEquals(1, numSuccessfulPlugins);
+sc.stop();
+sc = null;
+assertEquals(1, numSuccessfulTerminations);
+  }
+
+  @Test
+  public void testAddMultiplePlugins() throws InterruptedException {
--- End diff --

super nit: shall we test whether we can load multiple different plugins?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22325: [SPARK-25318]. Add exception handling when wrappi...

2018-09-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22325#discussion_r218857081
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator(
   throwFetchFailedException(blockId, address, e)
   }
 
-  input = streamWrapper(blockId, in)
-  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
-  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
-  if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
-val originalInput = input
-val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
-try {
+  try {
+input = streamWrapper(blockId, in)
+// Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+// block is small (the decompressed block is smaller than 
maxBytesInFlight)
+if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight 
/ 3) {
+  val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
   // Decompress the whole block at once to detect any 
corruption, which could increase
   // the memory usage tne potential increase the chance of OOM.
   // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
   Utils.copyStream(input, out)
   out.close()
   input = out.toChunkedByteBuffer.toInputStream(dispose = true)
--- End diff --

We create a new `input` here, so the original input shall be closed to 
avoid memory leak.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD ...

2018-09-11 Thread jiangxb1987
Github user jiangxb1987 closed the pull request at:

https://github.com/apache/spark/pull/20414


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22351: [MINOR][SQL] Add a debug log when a SQL text is used for...

2018-09-09 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22351
  
Just confirmed if the view is created and retrieved both at Spark side then 
there will be no exception thrown.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22351: [MINOR][SQL] Add a debug log when a SQL text is used for...

2018-09-09 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22351
  
This is actually read some view created by Hive, so I don't think it shall 
be a problem with view write side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-09-05 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22165
  
I think it should be fine to make `ContextBarrierState` private[spark] to 
test it, WDYT @mengxr ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22277: [SPARK-25276] Redundant constrains when using alias

2018-09-05 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22277
  
You can have `select * from (select a, a as c from table1 where a > 10) t 
where a > c`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22277: [SPARK-25276] Redundant constrains when using alias

2018-09-04 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22277
  
Thank you for interest in this issue, however, I don't think the changes 
proposed in this PR is valid, consider you have another predicate like `a > z`, 
it is surely desired to infer a new constraint `z > z`. Please correct me if 
I'm wrong about this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22240
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recyc...

2018-09-04 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22330

[SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle SparkSession on 
TakeOrderedAndProjectSuite finishes

## What changes were proposed in this pull request?

Previously in `TakeOrderedAndProjectSuite` the SparkSession will not get 
recycled when the test suite finishes.

## How was this patch tested?

N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark SPARK-19355

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22330


commit 0b010669b15781a648f7c7bde13556ddb7c003c3
Author: Xingbo Jiang 
Date:   2018-09-04T12:23:30Z

properly recycle SparkSession on TakeOrderedAndProjectSuite finishes.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22240
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
ping @tgravescs @mridulm @squito @markhamstra 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for...

2018-08-29 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22240#discussion_r213754911
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala ---
@@ -82,31 +82,22 @@ private[spark] abstract class Task[T](
 SparkEnv.get.blockManager.registerTask(taskAttemptId)
 // TODO SPARK-24874 Allow create BarrierTaskContext based on 
partitions, instead of whether
 // the stage is barrier.
-context = if (isBarrier) {
--- End diff --

`context` is still used in the following statements.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-29 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...

2018-08-28 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22247#discussion_r213378931
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -108,38 +108,12 @@ def _load_from_socket(port, auth_secret):
 """
 Load data from a given socket, this is a blocking method thus only 
return when the socket
 connection has been closed.
-
-This is copied from context.py, while modified the message protocol.
 """
-sock = None
-# Support for both IPv4 and IPv6.
-# On most of IPv6-ready systems, IPv6 will take precedence.
-for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
-af, socktype, proto, canonname, sa = res
-sock = socket.socket(af, socktype, proto)
-try:
-# Do not allow timeout for socket reading operation.
-sock.settimeout(None)
-sock.connect(sa)
-except socket.error:
-sock.close()
-sock = None
-continue
-break
-if not sock:
-raise Exception("could not open socket")
-
-# We don't really need a socket file here, it's just for convenience 
that we can reuse the
-# do_server_auth() function and data serialization methods.
-sockfile = sock.makefile("rwb", 65536)
-
+(sockfile, sock) = local_connect_and_auth(port, auth_secret)
--- End diff --

We must set sock timeout to `None` to allow `barrier()` call blocking 
forever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-08-28 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22240
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-28 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21976: [SPARK-24909][core] Always unregister pending par...

2018-08-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21976#discussion_r213176636
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2474,19 +2478,21 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 runEvent(makeCompletionEvent(
   taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
 
-// There should be no new attempt of stage submitted,
-// because task(stageId=1, stageAttempt=1, partitionId=1) is still 
running in
-// the current attempt (and hasn't completed successfully in any 
earlier attempts).
-assert(taskSets.size === 4)
+// At this point there should be no active task set for stageId=1 and 
we need
+// to resubmit because the output from (stageId=1, stageAttemptId=0, 
partitionId=1)
+// was ignored due to executor failure
+assert(taskSets.size === 5)
+assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2
+  && taskSets(4).tasks.size === 1)
 
-// Complete task(stageId=1, stageAttempt=1, partitionId=1) 
successfully.
+// Complete task(stageId=1, stageAttempt=2, partitionId=1) 
successfully.
 runEvent(makeCompletionEvent(
-  taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
+  taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2)))
--- End diff --

Yea thanks for explanation, BTW what's the jira number of the ongoing 
scheduler integration test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r213050049
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a ServerSocket to accept method calls from Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+if (isBarrier) {
+  serverSocket = Some(new ServerSocket(/* port */ 0,
+/* backlog */ 1,
+InetAddress.getByName("localhost")))
+  // A call to accept() for ServerSocket shall block infinitely.
+  serverSocket.map(_.setSoTimeout(0))
+  new Thread("accept-connections") {
+setDaemon(true)
+
+override def run(): Unit = {
+  while (!serverSocket.get.isClosed()) {
+var sock: Socket = null
+try {
+  sock = serverSocket.get.accept()
+  // Wait for function call from python side.
+  sock.setSoTimeout(1)
+  val input = new DataInputStream(sock.getInputStream())
--- End diff --

Thanks for catching this, yea I agree it would be better to move the 
authentication before recognising functions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21976: [SPARK-24909][core] Always unregister pending par...

2018-08-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21976#discussion_r213042176
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2474,19 +2478,21 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 runEvent(makeCompletionEvent(
   taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
 
-// There should be no new attempt of stage submitted,
-// because task(stageId=1, stageAttempt=1, partitionId=1) is still 
running in
-// the current attempt (and hasn't completed successfully in any 
earlier attempts).
-assert(taskSets.size === 4)
+// At this point there should be no active task set for stageId=1 and 
we need
+// to resubmit because the output from (stageId=1, stageAttemptId=0, 
partitionId=1)
+// was ignored due to executor failure
+assert(taskSets.size === 5)
+assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2
+  && taskSets(4).tasks.size === 1)
 
-// Complete task(stageId=1, stageAttempt=1, partitionId=1) 
successfully.
+// Complete task(stageId=1, stageAttempt=2, partitionId=1) 
successfully.
 runEvent(makeCompletionEvent(
-  taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
+  taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2)))
--- End diff --

IIUC the test case shall still pass without changing this line right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-26 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
The changes looks good from my side, it summarizes the current insight we 
have towards the data correctness issue caused by input order aware operators 
and inconsistent shuffle output order, also it provides a temporarily 
workaround of the above issue by failing. I feel we can have this in 2.4 and 
continue investigation in future releases. Let's listen to @tgravescs @mridulm 
@markhamstra who have been actively tracking the issue to see whether we can 
move forward with this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-24 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/21698
  
Thanks everyone! I closed this in favor of #22112


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21698: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-24 Thread jiangxb1987
Github user jiangxb1987 closed the pull request at:

https://github.com/apache/spark/pull/21698


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212653282
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
--- End diff --

IIRC we didn't cancel running tasks for failed stage attempts because we 
still expect them to finish and write outputs, however it's not that case when 
you decide to retry all the tasks in a stage.  You can call 
`taskScheduler.killAllTaskAttempts()` to kill all running tasks for a specific 
stage without failing the stage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-24 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212651948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -305,17 +306,19 @@ object ShuffleExchangeExec {
 rdd
   }
 
+  // round-robin function is order sensitive if we don't sort the 
input.
+  val orderSensitiveFunc = isRoundRobin && 
!SQLConf.get.sortBeforeRepartition
   if (needToCopyObjectsBeforeShuffle(part)) {
-newRdd.mapPartitionsInternal { iter =>
+newRdd.mapPartitionsWithIndexInternal((_, iter) => {
--- End diff --

sounds good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

2018-08-24 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22211
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212383406
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -305,17 +306,19 @@ object ShuffleExchangeExec {
 rdd
   }
 
+  // round-robin function is order sensitive if we don't sort the 
input.
+  val orderSensitiveFunc = isRoundRobin && 
!SQLConf.get.sortBeforeRepartition
   if (needToCopyObjectsBeforeShuffle(part)) {
-newRdd.mapPartitionsInternal { iter =>
+newRdd.mapPartitionsWithIndexInternal((_, iter) => {
--- End diff --

Shouldn't we mark `newRdd` as `IDEMPOTENT` if insert a local sort (or 
`INDETERMINATE` if don't sort), so we don't have to mark the function as order 
sensitive?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212379326
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
--- End diff --

We shall also kill all running tasks for this map stage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212381036
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  // If the map stage is INDETERMINATE, which means the map 
tasks may return
+  // different result when re-try, we need to re-try all the 
tasks of the failed
+  // stage and its succeeding stages, because the input data 
will be changed after the
+  // map tasks are re-tried.
+  // Note that, if map stage is UNORDERED, we are fine. The 
shuffle partitioner is
+  // guaranteed to be idempotent, so the input data of the 
reducers will not change even
+  // if the map tasks are re-tried.
+  if (mapStage.rdd.computingRandomLevel == 
RDD.RandomLevel.INDETERMINATE) {
+def rollBackStage(stage: Stage): Unit = stage match {
+  case mapStage: ShuffleMapStage =>
+val numMissingPartitions = 
mapStage.findMissingPartitions().length
+if (numMissingPartitions < mapStage.numTasks) {
+  markStageAsFinished(
+mapStage,
+Some("preceding shuffle map stage with random 
output gets retried."),
+willRetry = true)
+  
mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId)
+  failedStages += mapStage
+}
+
+  case resultStage =>
+val numMissingPartitions = 
resultStage.findMissingPartitions().length
+if (numMissingPartitions < resultStage.numTasks) {
+  // TODO: support to rollback result tasks.
+  val errorMessage = "A shuffle map stage with random 
output was failed and " +
+s"retried. However, Spark cannot rollback the 
result stage $resultStage " +
+"to re-process the input data, and has to fail 
this job. Please " +
+"eliminate the randomness by checkpointing the RDD 
before " +
+"repartition/zip and try again."
+  abortStage(failedStage, errorMessage, None)
+}
+}
+
+def rollbackSucceedingStages(stageChain: List[Stage]): 
Unit = {
+  if (stageChain.head.id == failedStage.id) {
+stageChain.foreach { stage =>
+  if (!failedStages.contains(stage)) 
rollBackStage(stage)
+}
+  } else {
+stageChain.head.parents.foreach(s => 
rollbackSucceedingStages(s :: stageChain))
+  }
+}
+
+rollBackStage(failedStage)
--- End diff --

We may need some comment to explain the tricky corner case here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212368000
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -812,11 +813,13 @@ abstract class RDD[T: ClassTag](
*/
   private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
   f: (Int, Iterator[T]) => Iterator[U],
-  preservesPartitioning: Boolean = false): RDD[U] = withScope {
+  preservesPartitioning: Boolean = false,
+  orderSensitiveFunc: Boolean = false): RDD[U] = withScope {
--- End diff --

nit: add param comment for `orderSensitiveFunc`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...

2018-08-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r212376990
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
   // RDD chain.
   @transient protected lazy val isBarrier_ : Boolean =
 dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
+
+  /**
+   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
+   * for the definition of random level.
+   *
+   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
+   * level of current RDD is the random level of the parent which is 
random most.
+   */
+  // TODO: make it public so users can set random level to their custom 
RDDs.
+  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
+  // partitions.
+  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
+val parentRandomLevels = dependencies.map {
+  case dep: ShuffleDependency[_, _, _] =>
+if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) 
{
+  RDD.RandomLevel.INDETERMINATE
--- End diff --

> > All other shuffle cases, we dont know the output order in spark.
> 
> Actually we know. As long as the shuffle map stage RDD is IDEMPOTENT or 
UNORDERED, the reduce RDD is UNORDERED instead of INDETERMINATE.

IIUC shuffle map itself works as follows:

- If Aggregator and key ordering are specified:
  - output becomes idempotent;
- If Aggregator or key ordering are not specified:
  - If input is indeterminate, then output becomes indeterminate;
  - If input is idempotent or unordered, then output becomes unordered.

We have to also include the case @mridulm raised that shuffle map may be 
skipped.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-22 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22165
  
One general idea is that we don't need to rely on the RPC framework to test 
`ContextBarrierState`, just mock `RpcCallContext`s should be enough (haven't go 
into detail so correct me if I'm wrong).
We shall cover the following scenarios:

- `RequestToSync` that carries different `numTasks`;
- `RequestToSync` that carries different `barrierEpoch`;
- Collect enough `RequestToSync` messages before timeout;
- Don't collect enough `RequestToSync` messages before timeout;
- Handle `RequestToSync` from different stage attempts concurrently;
- Make sure we clear all the internal data under each case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...

2018-08-21 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22165
  
I'll make one pass of this later today :) Thanks for taking this task!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...

2018-08-21 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22079
  
LGTM, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-21 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211683369
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,126 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+write_with_length("run".encode("utf-8"), sockfile)
+sockfile.flush()
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
--- End diff --

IIUC reuse python worker just means we start a python worker from a daemon 
thread, it shall not affect the input/output files related to worker.py.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22166: [2.3][SPARK-25114][Core][FOLLOWUP] Fix RecordBina...

2018-08-21 Thread jiangxb1987
Github user jiangxb1987 closed the pull request at:

https://github.com/apache/spark/pull/22166


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22166: [2.3][SPARK-25114][Core][FOLLOWUP] Fix RecordBina...

2018-08-21 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22166

[2.3][SPARK-25114][Core][FOLLOWUP] Fix RecordBinaryComparatorSuite build 
failure

## What changes were proposed in this pull request?

Fix RecordBinaryComparatorSuite build failure

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark SPARK-25114-2.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22166.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22166


commit 3ec52f802ad19923042d604e8c04725019519c46
Author: Xingbo Jiang 
Date:   2018-08-21T07:07:49Z

fix test suite build failure




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...

2018-08-21 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22158
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...

2018-08-20 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22158
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22158: [SPARK-25161][Core] Fix several bugs in failure h...

2018-08-20 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22158

[SPARK-25161][Core] Fix several bugs in failure handling of barrier 
execution mode

## What changes were proposed in this pull request?

Fix several bugs in failure handling of barrier execution mode:
* Mark TaskSet for a barrier stage as zombie when a task attempt fails;
* Multiple barrier task failures from a single barrier stage should not 
trigger multiple stage retries;
* Barrier task failure from a previous failed stage attempt should not 
trigger stage retry;
* Fail the job when a task from a barrier ResultStage failed;
* RDD.isBarrier() should not rely on `ShuffleDependency`s.

## How was this patch tested?

Added corresponding test cases in `DAGSchedulerSuite` and 
`TaskSchedulerImplSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark failure

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22158.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22158


commit 32ea946c68c5f3108fb18f7e936ba440f7537144
Author: Xingbo Jiang 
Date:   2018-08-20T17:19:35Z

update




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...

2018-08-20 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22101
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...

2018-08-20 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22101
  
Thanks @squito I've added another test case to cover when the last byte 
differs. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-20 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r211182337
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,124 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, port, secret):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._port = port
+cls._secret = secret
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._port is None or self._secret is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+_load_from_socket(self._port, self._secret)
+
+def getTaskInfos(self):
--- End diff --

fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...

2018-08-17 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r210963511
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +99,124 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+def _load_from_socket(port, auth_secret):
+"""
+Load data from a given socket, this is a blocking method thus only 
return when the socket
+connection has been closed.
+"""
+sock = None
+# Support for both IPv4 and IPv6.
+# On most of IPv6-ready systems, IPv6 will take precedence.
+for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, 
socket.SOCK_STREAM):
+af, socktype, proto, canonname, sa = res
+sock = socket.socket(af, socktype, proto)
+try:
+# Do not allow timeout for socket reading operation.
+sock.settimeout(None)
+sock.connect(sa)
+except socket.error:
+sock.close()
+sock = None
+continue
+break
+if not sock:
+raise Exception("could not open socket")
+
+sockfile = sock.makefile("rwb", 65536)
+do_server_auth(sockfile, auth_secret)
+
+# The socket will be automatically closed when garbage-collected.
+return UTF8Deserializer().loads(sockfile)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_port = None
+_secret = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
+
+@classmethod
+def _getOrCreate(cls):
+"""Internal function to get or create global BarrierTaskContext."""
+if cls._taskContext is None:
+cls._taskContext = BarrierTaskContext()
+return cls._taskContext
+
+@classmethod
+def get(cls):
+"""
+Return the currently active BarrierTaskContext. This can be called 
inside of user functions
+to access contextual information about running tasks.
+
+.. note:: Must be called on the worker, not the driver. Returns 
None if not initialized.
+"""
+return cls._taskContext
+
+@classmethod
+def _initialize(cls, port, secret):
+"""
+Initialize BarrierTaskContext, other methods within 
BarrierTaskContext can only be called
+after BarrierTaskContext is initialized.
+"""
+cls._port = port
+cls._secret = secret
+
+def barrier(self):
+"""
+.. note:: Experimental
+
+Sets a global barrier and waits until all tasks in this stage hit 
this barrier.
+Note this method is only allowed for a BarrierTaskContext.
+
+.. versionadded:: 2.4.0
+"""
+if self._port is None or self._secret is None:
+raise Exception("Not supported to call barrier() before 
initialize " +
+"BarrierTaskContext.")
+else:
+_load_from_socket(self._port, self._secret)
+
+def getTaskInfos(self):
--- End diff --

This is not available temporarily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...

2018-08-17 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r210963181
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -381,6 +421,45 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
   }
 }
   }
+
+  /**
+   * Gateway to call BarrierTaskContext.barrier().
+   */
+  def barrierAndServe(): Unit = {
--- End diff --

It's not clear yet how to trigger this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() data corr...

2018-08-16 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22112
  
> IMO we should traverse the dependency graph and rely on how ShuffledRDD 
is configured

A trivial point here - Since `ShuffleDependency` is also a DeveloperAPI, 
it's possible for users to write a customized RDD that behaves like 
`ShuffleRDD`, so we may want to depend on dependencies rather than RDDs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...

2018-08-15 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22101
  
ping @gatorsmile @mridulm @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() da...

2018-08-15 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210450123
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1441,6 +1441,18 @@ class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  if (!mapStage.rdd.isIdempotent) {
+// The map stage is not idempotent, we have to rerun all 
the tasks for the
+// failed stage to get expected result.
+failedStage match {
+  case s: ShuffleMapStage =>
--- End diff --

We may also have to update the logic in 
`removeExecutorAndUnregisterOutputs`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() da...

2018-08-15 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22112#discussion_r210449640
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1441,6 +1441,18 @@ class DAGScheduler(
 failedStages += failedStage
 failedStages += mapStage
 if (noResubmitEnqueued) {
+  if (!mapStage.rdd.isIdempotent) {
+// The map stage is not idempotent, we have to rerun all 
the tasks for the
+// failed stage to get expected result.
+failedStage match {
+  case s: ShuffleMapStage =>
--- End diff --

Like we discussed, we shall also retry the partially finished succeeding 
stages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/21698
  
Thanks @cloud-fan your summary above is super useful, and I think it's 
clear enough.

> So when we see fetch failure and rerun map tasks, we should track which 
reducers have its shuffle blocks being rewritten, and rerun them.

IIUC, patterns like `rdd.map(...).groupBy()` shall always be under risk if 
we can generate non-determine output in `map()` right?

> Simply inserting a sort before shuffle doesn't help. The fix for 
dataframe is adding a sort before round-robin, to make it deterministic. If we 
add the sort after round-robin and before shuffle, the problem still exists.

Does this means, if we can generate non-determine output, then we can still 
loss some data even add a local sort before shuffle, because the reduce tasks 
may have already finished (or even have committed)?

> be more conservative when handling fetch failure and rerun more reduce 
tasks. We can provide an internal API to tag a RDD as deterministic (very 
common in Spark SQL) and then we can safely be optimistic when handling fetch 
failure.

This is somehow like what I proposed yesterday, one issue we can't resolve 
is that some ResultTasks may have committed, in that case it seems the best 
effort we can make is just fail the job.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209974729
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

The major issue here is that we want to make the `barrier()` call blocking, 
the task shall wait until timeout or succeeded, do we have other ways to 
achieve this goal other than current approach here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22101
  
@squito I've created a new JIRA task and updated the title, thanks for 
reminding!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209853276
  
--- Diff: python/pyspark/taskcontext.py ---
@@ -95,3 +95,92 @@ def getLocalProperty(self, key):
 Get a local property set upstream in the driver, or None if it is 
missing.
 """
 return self._localProperties.get(key, None)
+
+
+class BarrierTaskContext(TaskContext):
+
+"""
+.. note:: Experimental
+
+A TaskContext with extra info and tooling for a barrier stage. To 
access the BarrierTaskContext
+for a running task, use:
+L{BarrierTaskContext.get()}.
+
+.. versionadded:: 2.4.0
+"""
+
+_barrierContext = None
+
+def __init__(self):
+"""Construct a BarrierTaskContext, use get instead"""
+pass
--- End diff --

This just follows `TaskContext.__init__()`, shall we update both?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryComparator...

2018-08-14 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22101
  
cc @mridulm @squito 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...

2018-08-14 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22101

[SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryComparator when subtraction 
between two words is divisible by Integer.MAX_VALUE.

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/22079#discussion_r209705612 It is 
possible for two objects to be unequal and yet we consider them as equal with 
this code, if the long values are separated by Int.MaxValue.
This PR fixes the issue.

## How was this patch tested?
N/A


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark fix-rbc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22101.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22101


commit 1f6b2594ebe8b50e4cb2fcde15181cfa9a17f48c
Author: Xingbo Jiang 
Date:   2018-08-14T04:04:40Z

fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shu...

2018-08-13 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22079#discussion_r209822194
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
 ---
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution;
+
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class RecordBinaryComparator extends RecordComparator {
+
+  // TODO(jiangxb) Add test suite for this.
+  @Override
+  public int compare(
+  Object leftObj, long leftOff, int leftLen, Object rightObj, long 
rightOff, int rightLen) {
+int i = 0;
+int res = 0;
+
+// If the arrays have different length, the longer one is larger.
+if (leftLen != rightLen) {
+  return leftLen - rightLen;
+}
+
+// The following logic uses `leftLen` as the length for both `leftObj` 
and `rightObj`, since
+// we have guaranteed `leftLen` == `rightLen`.
+
+// check if stars align and we can get both offsets to be aligned
+if ((leftOff % 8) == (rightOff % 8)) {
+  while ((leftOff + i) % 8 != 0 && i < leftLen) {
+res = (Platform.getByte(leftObj, leftOff + i) & 0xff) -
+(Platform.getByte(rightObj, rightOff + i) & 0xff);
+if (res != 0) return res;
+i += 1;
+  }
+}
+// for architectures that support unaligned accesses, chew it up 8 
bytes at a time
+if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + 
i) % 8 == 0))) {
+  while (i <= leftLen - 8) {
+res = (int) ((Platform.getLong(leftObj, leftOff + i) -
+Platform.getLong(rightObj, rightOff + i)) % 
Integer.MAX_VALUE);
+if (res != 0) return res;
--- End diff --

As far as I can recall It's actually a mistake. At first the function 
returned a Long value and later I changed the return value to Integer, let me 
fix it. Thanks for discovering this @mridulm !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22001
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-13 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/21698
  
@tgravescs I'm still working on this but I would be glad if you can also 
work on the "sort the serialized bytes of T" approach, actually the 
retry-all-tasks approach seems more complex than I expected when it involves 
commit protocol (currently a task can be only committed once, so if you already 
have some tasks committed and then hit a ExecutorLost then retry-all-tasks 
won't work), so I hope we can have other approaches like "sort the serialized 
bytes of T" get merged into 2.4 release.

I'll post the benchmark result of DF.repartition() fix later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-08-13 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/21698
  
We fixed the DataFrame repartition correctness issue by inserting a local 
sort before repartition, and feedback for this approach is generally negative 
because the performance of repartition() goes down significantly, and that even 
queries that don't have potential correctness issue also have to pay for the 
performance regression (eg. rdd.repartition(...).map(...).collect()). That's 
the major reason why we are trying to resolve the correctness issues in a 
different way.

I agree that correctness shall be allocated higher priority than 
performance, but things are not black or white, here we really care about both. 
We also want to guarantee that if you don't have correctness issue before hand, 
you are least affected by the proposed fix approach.

I'm currently working on a extended approach based on that proposed in this 
PR, that shall handle the cascading stages issue @mridulm mentioned above 
(rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, 
v._2))).groupByKey().map().save()). Please also note that this actually don't 
implies we will retry more stages, it's true we will retry more tasks to ensure 
correctness on FetchFailure/ExecutorLost, but we won't retry more stages.

Having said that, IMO the best bet is to implement both approaches(the 
insert-local-sort one and the retry-all-tasks-on-failure one) and create a flag 
for each of them, so users may choose a approach based on different workload 
patterns, though it's also debatable which approach shall be enabled by 
default. Unfortunately we are not able to deliver them on 2.4, but I'm 
optimistic we may include them in 3.0 and of course backport them to all the 
active branches.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...

2018-08-13 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22001
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22085#discussion_r209490553
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
 dataOut.writeInt(partitionIndex)
 // Python version of driver
 PythonRDD.writeUTF(pythonVer, dataOut)
+// Init a GatewayServer to port current BarrierTaskContext to 
Python side.
+val isBarrier = context.isInstanceOf[BarrierTaskContext]
+val secret = if (isBarrier) {
+  Utils.createSecret(env.conf)
+} else {
+  ""
+}
+val gatewayServer: Option[GatewayServer] = if (isBarrier) {
+  Some(new GatewayServer.GatewayServerBuilder()
+.entryPoint(context.asInstanceOf[BarrierTaskContext])
+.authToken(secret)
+.javaPort(0)
+.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, 
GatewayServer.defaultAddress(),
+  secret)
+.build())
--- End diff --

We have to port `BarrierTaskContext` from java to python side, otherwise 
there is no way to call `BarrierTaskContext.barrier()` from python side. Thus, 
of course, the JavaGateway is only initiated when the context is a 
`BarrierTaskContext`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...

2018-08-12 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22079
  
Both seems fine to me, it's just a minor improvement. Normally we don't 
backport a improvement, but since it's a simple and small change I'm confident 
it is safe to also include the change in a backport PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

2018-08-12 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22085

[SPARK-25095][PySpark] Python support for BarrierTaskContext

## What changes were proposed in this pull request?

Add method `barrier()` and `getTaskInfos()` in python TaskContext, these 
two methods are only allowed for barrier tasks.

## How was this patch tested?

TBD


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark python.barrier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22085


commit 7b488299709f715d344e5c38956577f31718ab34
Author: Xingbo Jiang 
Date:   2018-08-12T16:04:20Z

implement python barrier taskcontext




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >