[GitHub] spark issue #23228: [MINOR][DOC]The condition description of serialized shuf...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/23228 Please update the title `[MINOR][DOC] Update the condition description of serialized shuffle` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23222: [SPARK-20636] Add the rule TransposeWindow to the optimi...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/23222 Shall we add a SQL tag to the title? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRad...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/23046 I searched the code and didn't find similar issues, so this is the only one shall be fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22912: [SPARK-25901][CORE] Use only one thread in BarrierTaskCo...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22912 Thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r229717747 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { --- End diff -- Please update the above comment to explain the new behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22723: [SPARK-25729][CORE]It is better to replace `minPa...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22723#discussion_r229717581 --- Diff: core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala --- @@ -48,11 +50,11 @@ private[spark] class WholeTextFileInputFormat * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API, * which is set through setMaxSplitSize */ - def setMinPartitions(context: JobContext, minPartitions: Int) { + def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val files = listStatus(context).asScala val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum -val maxSplitSize = Math.ceil(totalLen * 1.0 / - (if (minPartitions == 0) 1 else minPartitions)).toLong +val minPartNum = Math.max(sc.defaultParallelism, minPartitions) --- End diff -- This is potentially a behavior change. cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22849: [SPARK-25852][Core] we should filter the workOffers with...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22849 It may happen that a busy executor is marked as lost and later it re-register to the driver, in that case currently we call `makeOffers()` and that will add the executor into `TaskSchedulerImpl.hostToExecutors`. This is bad implementation here since it shall not have depend on the `makeOffers()` function to update a unrelated protected val, but that's what we have for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22849: [SPARK-25852][Core] we should filter the workOffers with...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22849 What do you mean by "better performance" ? If that means we can spend less time on `TaskSchedulerImpl.resourceOffers()` then I agree it's true, but AFAIK it's never reported this can be a bottleneck of the whole cluster, so maybe the perf gain is trivial here. If you expect better task distribution over existing executors then I don't see any case can be improved by this proposed change. Please correct me if I'm wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22853: [SPARK-25845][SQL] Fix MatchError for calendar in...
Github user jiangxb1987 closed the pull request at: https://github.com/apache/spark/pull/22853 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22853: [SPARK-25845][SQL] Fix MatchError for calendar interval ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22853 Merging to master, I can open another PR against 2.4 if required in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22853: [SPARK-25845][SQL] Fix MatchError for calendar interval ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22853 Also cc @gatorsmile @cloud-fan @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22853: [SPARK-25845][SQL] Fix MatchError for calendar in...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22853 [SPARK-25845][SQL] Fix MatchError for calendar interval type in range frame left boundary ## What changes were proposed in this pull request? WindowSpecDefinition checks start < last, but CalendarIntervalType is not comparable, so it would throw the following exception at runtime: ``` scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) at org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:58) at org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering$lzycompute(predicates.scala:592) at org.apache.spark.sql.catalyst.expressions.BinaryComparison.ordering(predicates.scala:592) at org.apache.spark.sql.catalyst.expressions.GreaterThan.nullSafeEval(predicates.scala:797) at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:496) at org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.isGreaterThan(windowExpressions.scala:245) at org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame.checkInputDataTypes(windowExpressions.scala:216) at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:171) at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:171) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38) at scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43) at scala.collection.mutable.ArrayBuffer.forall(ArrayBuffer.scala:48) at org.apache.spark.sql.catalyst.expressions.Expression.childrenResolved(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved$lzycompute(windowExpressions.scala:48) at org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition.resolved(windowExpressions.scala:48) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$childrenResolved$1.apply(Expression.scala:183) at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83) ``` We fix the issue by only perform the check on boundary expressions that are AtomicType. ## How was this patch tested? Add new test case in `DataFrameWindowFramesSuite` You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark windowBoundary Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22853 commit 9d2a1b27caefb6b61c767d7971782b9a74e5d199 Author: Xingbo Jiang Date: 2018-10-26T15:41:32Z fix CalendarIntervalType window boundary failure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22813: [SPARK-25818][CORE] WorkDirCleanup should only remove th...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22813 IIUC it's not expected to share the SPARK_WORK_DIR with any other usage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r227459990 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + logInfo( +s"Job ${job.jobId} is finished. Killing speculative tasks for this job") + // ResultStage is only used by this job. It's safe to kill speculative or + // zombie tasks in this stage. + taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job)) --- End diff -- IIRC `cancelTasks()` will fail the stage (maybe it's okay here coz the stage has been marked completed), if we just want to kill speculative/zombie tasks then maybe we shall call `killAllTaskAttempts()` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22674 LGTM, do you have any other concerns @hvanhovell @brkyvz @dongjoon-hyun ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22677: [SPARK-25683][Core] Make AsyncEventQueue.lastReportTimes...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22677 Sounds good! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22699: [SPARK-25711][Core] Allow start-history-server.sh to sho...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22699 Let's also update the title to include the deprecation changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22699#discussion_r224508691 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala --- @@ -34,26 +34,25 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin @tailrec private def parse(args: List[String]): Unit = { -if (args.length == 1) { - setLogDirectory(args.head) -} else { - args match { -case ("--dir" | "-d") :: value :: tail => - setLogDirectory(value) - parse(tail) - -case ("--help" | "-h") :: tail => - printUsageAndExit(0) - -case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) - -case Nil => - -case _ => - printUsageAndExit(1) - } +args match { + case ("--dir" | "-d") :: value :: tail => +setLogDirectory(value) +parse(tail) + + case ("--help" | "-h") :: tail => +printUsageAndExit(0) + + case ("--properties-file") :: value :: tail => +propertiesFile = value +parse(tail) + + case dir :: Nil => --- End diff -- sounds good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22699#discussion_r224508223 --- Diff: sbin/start-history-server.sh --- @@ -28,7 +28,22 @@ if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi +# NOTE: This exact class name is matched downstream by SparkSubmit. +# Any changes need to be reflected there. +CLASS="org.apache.spark.deploy.history.HistoryServer" + +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + echo "Usage: ./sbin/start-history-server.sh [options]" --- End diff -- Well, I also saw similar code in `start-thriftserver.sh`, it uses `usage()`. Both are fine to me, just head up to make sure we've taken that into consideration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22699#discussion_r224507524 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala --- @@ -34,26 +34,25 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin @tailrec private def parse(args: List[String]): Unit = { -if (args.length == 1) { - setLogDirectory(args.head) -} else { - args match { -case ("--dir" | "-d") :: value :: tail => - setLogDirectory(value) - parse(tail) - -case ("--help" | "-h") :: tail => - printUsageAndExit(0) - -case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) - -case Nil => - -case _ => - printUsageAndExit(1) - } +args match { + case ("--dir" | "-d") :: value :: tail => +setLogDirectory(value) +parse(tail) + + case ("--help" | "-h") :: tail => +printUsageAndExit(0) + + case ("--properties-file") :: value :: tail => +propertiesFile = value +parse(tail) + + case dir :: Nil => --- End diff -- I'm not against the change, but we shall mention it in the PR desc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22699#discussion_r224504103 --- Diff: sbin/start-history-server.sh --- @@ -28,7 +28,22 @@ if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi +# NOTE: This exact class name is matched downstream by SparkSubmit. +# Any changes need to be reflected there. +CLASS="org.apache.spark.deploy.history.HistoryServer" + +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + echo "Usage: ./sbin/start-history-server.sh [options]" --- End diff -- nit: why not have a separated `usage()` function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22699: [SPARK-25711][Core] Allow history server to show ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22699#discussion_r224504246 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala --- @@ -34,26 +34,25 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin @tailrec private def parse(args: List[String]): Unit = { -if (args.length == 1) { - setLogDirectory(args.head) -} else { - args match { -case ("--dir" | "-d") :: value :: tail => - setLogDirectory(value) - parse(tail) - -case ("--help" | "-h") :: tail => - printUsageAndExit(0) - -case ("--properties-file") :: value :: tail => - propertiesFile = value - parse(tail) - -case Nil => - -case _ => - printUsageAndExit(1) - } +args match { + case ("--dir" | "-d") :: value :: tail => +setLogDirectory(value) +parse(tail) + + case ("--help" | "-h") :: tail => +printUsageAndExit(0) + + case ("--properties-file") :: value :: tail => +propertiesFile = value +parse(tail) + + case dir :: Nil => --- End diff -- IIUC this is not related to the PR description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22165 Actually my original thinking was like this: ``` val state = new ContextBarrierState(barrierId, numTasks) val requester = mockRequester() val request = forgeRequest(numTasks, stageId, stageAttemptId, taskAttemptId, barrierEpoch) state.handleRequest(requester, request) // Verify states ... // Verify cleanup ... ``` So you don't have to launch a SparkContext for the test. Could you please check whether this is feasible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22677: [SPARK-25683][Core] Make AsyncEventQueue.lastReportTimes...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22677 Though it looks a little strange, the log content is actually right, I don't think we want to make the last report timestamp to current time (that can confuse users what happened before that timestamp). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22674#discussion_r223729445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala --- @@ -75,95 +76,69 @@ trait QueryExecutionListener { */ @Experimental @InterfaceStability.Evolving -class ExecutionListenerManager private extends Logging { +class ExecutionListenerManager private[sql](session: SparkSession, loadExtensions: Boolean) --- End diff -- nit: we shall add param comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22674 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22325: [SPARK-25318]. Add exception handling when wrapping the ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22325 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r220589416 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -187,6 +191,12 @@ private[spark] class BarrierCoordinator( requesters.clear() cancelTimerTask() } + +// Check for clearing internal data, visible for test only. +private[spark] def cleanCheck(): Boolean = requesters.isEmpty && timerTask == null + +// Get currently barrier epoch, visible for test only. +private[spark] def getBarrierEpoch(): Int = barrierEpoch --- End diff -- Why not just make `barrierEpoch` visible for testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r220590215 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -187,6 +191,12 @@ private[spark] class BarrierCoordinator( requesters.clear() cancelTimerTask() } + +// Check for clearing internal data, visible for test only. +private[spark] def cleanCheck(): Boolean = requesters.isEmpty && timerTask == null --- End diff -- nit: `cleanCheck()` -> `isInternalStateClear()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22165: [SPARK-25017][Core] Add test suite for BarrierCoo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22165#discussion_r220591706 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierCoordinatorSuite.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeoutException + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import org.scalatest.concurrent.Eventually + +import org.apache.spark._ +import org.apache.spark.rpc.RpcTimeout + +class BarrierCoordinatorSuite extends SparkFunSuite with LocalSparkContext with Eventually { + + /** + * Get the current ContextBarrierState from barrierCoordinator.states by ContextBarrierId. + */ + private def getBarrierState( + stageId: Int, + stageAttemptId: Int, + barrierCoordinator: BarrierCoordinator) = { +val barrierId = ContextBarrierId(stageId, stageAttemptId) +barrierCoordinator.states.get(barrierId) + } + + test("normal test for single task") { +sc = new SparkContext("local", "test") +val barrierCoordinator = new BarrierCoordinator(5, sc.listenerBus, sc.env.rpcEnv) +val rpcEndpointRef = sc.env.rpcEnv.setupEndpoint("barrierCoordinator", barrierCoordinator) +val stageId = 0 +val stageAttemptNumber = 0 +rpcEndpointRef.askSync[Unit]( --- End diff -- We are still relying on the RPC framework, can we get rid of this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22458: [SPARK-25459] Add viewOriginalText back to Catalo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22458#discussion_r220410022 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -2348,4 +2348,17 @@ class HiveDDLSuite } } } + + test("desc formatted table should also show viewOriginalText for views") { +withView("v1") { + sql("CREATE VIEW v1 AS SELECT 1 AS value") + assert(sql("DESC FORMATTED v1").collect().containsSlice( +Seq( + Row("Type", "VIEW", ""), + Row("View Text", "SELECT 1 AS value", ""), + Row("View Original Text:", "SELECT 1 AS value", "") --- End diff -- @zheyuan28 This is intended, you shall create a view using previous versions of Spark, or create a view using Hive directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22526: [SPARK-25502][CORE][WEBUI]Empty Page when page nu...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22526#discussion_r219891354 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala --- @@ -685,7 +685,7 @@ private[ui] class TaskDataSource( private var _tasksToShow: Seq[TaskData] = null - override def dataSize: Int = taskCount(stage) + override def dataSize: Int = store.taskCount(stage.stageId, stage.attemptId).toInt --- End diff -- nit: after this change, the function `taskCount()` is only referenced by `totalTasks`, we can inline that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22458: [SPARK-25459] Add viewOriginalText back to Catalo...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22458#discussion_r219370221 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -467,9 +467,9 @@ private[hive] class HiveClientImpl( properties = filteredProperties, stats = readHiveStats(properties), comment = comment, -// In older versions of Spark(before 2.2.0), we expand the view original text and store -// that into `viewExpandedText`, and that should be used in view resolution. So we get -// `viewExpandedText` instead of `viewOriginalText` for viewText here. --- End diff -- This comment is for `viewText`, please rephrase and keep it, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22325: [SPARK-25318]. Add exception handling when wrappi...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22325#discussion_r218873184 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator( throwFetchFailedException(blockId, address, e) } - input = streamWrapper(blockId, in) - // Only copy the stream if it's wrapped by compression or encryption, also the size of - // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { -val originalInput = input -val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) -try { + try { +input = streamWrapper(blockId, in) +// Only copy the stream if it's wrapped by compression or encryption, also the size of +// block is small (the decompressed block is smaller than maxBytesInFlight) +if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. Utils.copyStream(input, out) out.close() input = out.toChunkedByteBuffer.toInputStream(dispose = true) --- End diff -- I'm not the original author of that, but I think so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r218861435 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,6 +136,26 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val executorPlugins: Seq[ExecutorPlugin] = { +val pluginNames = conf.get(EXECUTOR_PLUGINS) +if (pluginNames.nonEmpty) { + logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}") + + // Plugins need to load using a class loader that includes the executor's user classpath + val pluginList: Seq[ExecutorPlugin] = +Utils.withContextClassLoader(replClassLoader) { + val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) + plugins.foreach(_.init()) --- End diff -- nit: Might be good to log whether each `plugin.init()` succeeded. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r218865220 --- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ExecutorPluginSuite { + private static final String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private static final String testBadPluginName = TestBadShutdownPlugin.class.getName(); + private static final String testPluginName = TestExecutorPlugin.class.getName(); + + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + + // Static value modified by testing plugin to verify plugins shut down properly. + public static int numSuccessfulTerminations = 0; + + private JavaSparkContext sc; + + @Before + public void setUp() { +sc = null; +numSuccessfulPlugins = 0; +numSuccessfulTerminations = 0; + } + + @After + public void tearDown() { +if (sc != null) { + sc.stop(); + sc = null; +} + } + + private SparkConf initializeSparkConf(String pluginNames) { +return new SparkConf() +.setMaster("local") +.setAppName("test") +.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames); + } + + @Test + public void testPluginClassDoesNotExist() { +SparkConf conf = initializeSparkConf("nonexistant.plugin"); +try { + sc = new JavaSparkContext(conf); + fail("No exception thrown for nonexistant plugin"); +} catch (Exception e) { + // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown + assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); +} + } + + @Test + public void testAddPlugin() throws InterruptedException { +// Load the sample TestExecutorPlugin, which will change the value of numSuccessfulPlugins +SparkConf conf = initializeSparkConf(testPluginName); +sc = new JavaSparkContext(conf); +assertEquals(1, numSuccessfulPlugins); +sc.stop(); +sc = null; +assertEquals(1, numSuccessfulTerminations); + } + + @Test + public void testAddMultiplePlugins() throws InterruptedException { --- End diff -- super nit: shall we test whether we can load multiple different plugins? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22325: [SPARK-25318]. Add exception handling when wrappi...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22325#discussion_r218857081 --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala --- @@ -444,36 +444,34 @@ final class ShuffleBlockFetcherIterator( throwFetchFailedException(blockId, address, e) } - input = streamWrapper(blockId, in) - // Only copy the stream if it's wrapped by compression or encryption, also the size of - // block is small (the decompressed block is smaller than maxBytesInFlight) - if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { -val originalInput = input -val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) -try { + try { +input = streamWrapper(blockId, in) +// Only copy the stream if it's wrapped by compression or encryption, also the size of +// block is small (the decompressed block is smaller than maxBytesInFlight) +if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) { + val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate) // Decompress the whole block at once to detect any corruption, which could increase // the memory usage tne potential increase the chance of OOM. // TODO: manage the memory used here, and spill it into disk in case of OOM. Utils.copyStream(input, out) out.close() input = out.toChunkedByteBuffer.toInputStream(dispose = true) --- End diff -- We create a new `input` here, so the original input shall be closed to avoid memory leak. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20414: [SPARK-23243][SQL] Shuffle+Repartition on an RDD ...
Github user jiangxb1987 closed the pull request at: https://github.com/apache/spark/pull/20414 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22351: [MINOR][SQL] Add a debug log when a SQL text is used for...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22351 Just confirmed if the view is created and retrieved both at Spark side then there will be no exception thrown. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22351: [MINOR][SQL] Add a debug log when a SQL text is used for...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22351 This is actually read some view created by Hive, so I don't think it shall be a problem with view write side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22165 I think it should be fine to make `ContextBarrierState` private[spark] to test it, WDYT @mengxr ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22277: [SPARK-25276] Redundant constrains when using alias
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22277 You can have `select * from (select a, a as c from table1 where a > 10) t where a > c` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22277: [SPARK-25276] Redundant constrains when using alias
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22277 Thank you for interest in this issue, however, I don't think the changes proposed in this PR is valid, consider you have another predicate like `a > z`, it is surely desired to infer a new constraint `z > z`. Please correct me if I'm wrong about this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22240 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recyc...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22330 [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle SparkSession on TakeOrderedAndProjectSuite finishes ## What changes were proposed in this pull request? Previously in `TakeOrderedAndProjectSuite` the SparkSession will not get recycled when the test suite finishes. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark SPARK-19355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22330 commit 0b010669b15781a648f7c7bde13556ddb7c003c3 Author: Xingbo Jiang Date: 2018-09-04T12:23:30Z properly recycle SparkSession on TakeOrderedAndProjectSuite finishes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22240 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 ping @tgravescs @mridulm @squito @markhamstra --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22240#discussion_r213754911 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Task.scala --- @@ -82,31 +82,22 @@ private[spark] abstract class Task[T]( SparkEnv.get.blockManager.registerTask(taskAttemptId) // TODO SPARK-24874 Allow create BarrierTaskContext based on partitions, instead of whether // the stage is barrier. -context = if (isBarrier) { --- End diff -- `context` is still used in the following statements. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22247: [SPARK-25253][PYSPARK] Refactor local connection ...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22247#discussion_r213378931 --- Diff: python/pyspark/taskcontext.py --- @@ -108,38 +108,12 @@ def _load_from_socket(port, auth_secret): """ Load data from a given socket, this is a blocking method thus only return when the socket connection has been closed. - -This is copied from context.py, while modified the message protocol. """ -sock = None -# Support for both IPv4 and IPv6. -# On most of IPv6-ready systems, IPv6 will take precedence. -for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): -af, socktype, proto, canonname, sa = res -sock = socket.socket(af, socktype, proto) -try: -# Do not allow timeout for socket reading operation. -sock.settimeout(None) -sock.connect(sa) -except socket.error: -sock.close() -sock = None -continue -break -if not sock: -raise Exception("could not open socket") - -# We don't really need a socket file here, it's just for convenience that we can reuse the -# do_server_auth() function and data serialization methods. -sockfile = sock.makefile("rwb", 65536) - +(sockfile, sock) = local_connect_and_auth(port, auth_secret) --- End diff -- We must set sock timeout to `None` to allow `barrier()` call blocking forever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22240 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21976: [SPARK-24909][core] Always unregister pending par...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21976#discussion_r213176636 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2474,19 +2478,21 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) -// There should be no new attempt of stage submitted, -// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in -// the current attempt (and hasn't completed successfully in any earlier attempts). -assert(taskSets.size === 4) +// At this point there should be no active task set for stageId=1 and we need +// to resubmit because the output from (stageId=1, stageAttemptId=0, partitionId=1) +// was ignored due to executor failure +assert(taskSets.size === 5) +assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2 + && taskSets(4).tasks.size === 1) -// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. +// Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully. runEvent(makeCompletionEvent( - taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) + taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2))) --- End diff -- Yea thanks for explanation, BTW what's the jira number of the ongoing scheduler integration test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r213050049 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) --- End diff -- Thanks for catching this, yea I agree it would be better to move the authentication before recognising functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21976: [SPARK-24909][core] Always unregister pending par...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21976#discussion_r213042176 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2474,19 +2478,21 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(makeCompletionEvent( taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) -// There should be no new attempt of stage submitted, -// because task(stageId=1, stageAttempt=1, partitionId=1) is still running in -// the current attempt (and hasn't completed successfully in any earlier attempts). -assert(taskSets.size === 4) +// At this point there should be no active task set for stageId=1 and we need +// to resubmit because the output from (stageId=1, stageAttemptId=0, partitionId=1) +// was ignored due to executor failure +assert(taskSets.size === 5) +assert(taskSets(4).stageId === 1 && taskSets(4).stageAttemptId === 2 + && taskSets(4).tasks.size === 1) -// Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully. +// Complete task(stageId=1, stageAttempt=2, partitionId=1) successfully. runEvent(makeCompletionEvent( - taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2))) + taskSets(4).tasks(0), Success, makeMapStatus("hostB", 2))) --- End diff -- IIUC the test case shall still pass without changing this line right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 The changes looks good from my side, it summarizes the current insight we have towards the data correctness issue caused by input order aware operators and inconsistent shuffle output order, also it provides a temporarily workaround of the above issue by failing. I feel we can have this in 2.4 and continue investigation in future releases. Let's listen to @tgravescs @mridulm @markhamstra who have been actively tracking the issue to see whether we can move forward with this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21698 Thanks everyone! I closed this in favor of #22112 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21698: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 closed the pull request at: https://github.com/apache/spark/pull/21698 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212653282 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) --- End diff -- IIRC we didn't cancel running tasks for failed stage attempts because we still expect them to finish and write outputs, however it's not that case when you decide to retry all the tasks in a stage. You can call `taskScheduler.killAllTaskAttempts()` to kill all running tasks for a specific stage without failing the stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212651948 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -305,17 +306,19 @@ object ShuffleExchangeExec { rdd } + // round-robin function is order sensitive if we don't sort the input. + val orderSensitiveFunc = isRoundRobin && !SQLConf.get.sortBeforeRepartition if (needToCopyObjectsBeforeShuffle(part)) { -newRdd.mapPartitionsInternal { iter => +newRdd.mapPartitionsWithIndexInternal((_, iter) => { --- End diff -- sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22211: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22211 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212383406 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -305,17 +306,19 @@ object ShuffleExchangeExec { rdd } + // round-robin function is order sensitive if we don't sort the input. + val orderSensitiveFunc = isRoundRobin && !SQLConf.get.sortBeforeRepartition if (needToCopyObjectsBeforeShuffle(part)) { -newRdd.mapPartitionsInternal { iter => +newRdd.mapPartitionsWithIndexInternal((_, iter) => { --- End diff -- Shouldn't we mark `newRdd` as `IDEMPOTENT` if insert a local sort (or `INDETERMINATE` if don't sort), so we don't have to mark the function as order sensitive? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212379326 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) --- End diff -- We shall also kill all running tasks for this map stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212381036 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) +} +} + +def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { +stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) +} + } else { +stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } +} + +rollBackStage(failedStage) --- End diff -- We may need some comment to explain the tricky corner case here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212368000 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -812,11 +813,13 @@ abstract class RDD[T: ClassTag]( */ private[spark] def mapPartitionsWithIndexInternal[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = withScope { + preservesPartitioning: Boolean = false, + orderSensitiveFunc: Boolean = false): RDD[U] = withScope { --- End diff -- nit: add param comment for `orderSensitiveFunc` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212376990 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { +val parentRandomLevels = dependencies.map { + case dep: ShuffleDependency[_, _, _] => +if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + RDD.RandomLevel.INDETERMINATE --- End diff -- > > All other shuffle cases, we dont know the output order in spark. > > Actually we know. As long as the shuffle map stage RDD is IDEMPOTENT or UNORDERED, the reduce RDD is UNORDERED instead of INDETERMINATE. IIUC shuffle map itself works as follows: - If Aggregator and key ordering are specified: - output becomes idempotent; - If Aggregator or key ordering are not specified: - If input is indeterminate, then output becomes indeterminate; - If input is idempotent or unordered, then output becomes unordered. We have to also include the case @mridulm raised that shuffle map may be skipped. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22165 One general idea is that we don't need to rely on the RPC framework to test `ContextBarrierState`, just mock `RpcCallContext`s should be enough (haven't go into detail so correct me if I'm wrong). We shall cover the following scenarios: - `RequestToSync` that carries different `numTasks`; - `RequestToSync` that carries different `barrierEpoch`; - Collect enough `RequestToSync` messages before timeout; - Don't collect enough `RequestToSync` messages before timeout; - Handle `RequestToSync` from different stage attempts concurrently; - Make sure we clear all the internal data under each case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22165: [SPARK-25017][Core] Add test suite for BarrierCoordinato...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22165 I'll make one pass of this later today :) Thanks for taking this task! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22079 LGTM, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211683369 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,126 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +write_with_length("run".encode("utf-8"), sockfile) +sockfile.flush() +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: --- End diff -- IIUC reuse python worker just means we start a python worker from a daemon thread, it shall not affect the input/output files related to worker.py. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22166: [2.3][SPARK-25114][Core][FOLLOWUP] Fix RecordBina...
Github user jiangxb1987 closed the pull request at: https://github.com/apache/spark/pull/22166 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22166: [2.3][SPARK-25114][Core][FOLLOWUP] Fix RecordBina...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22166 [2.3][SPARK-25114][Core][FOLLOWUP] Fix RecordBinaryComparatorSuite build failure ## What changes were proposed in this pull request? Fix RecordBinaryComparatorSuite build failure ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark SPARK-25114-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22166.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22166 commit 3ec52f802ad19923042d604e8c04725019519c46 Author: Xingbo Jiang Date: 2018-08-21T07:07:49Z fix test suite build failure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22158 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22158: [SPARK-25161][Core] Fix several bugs in failure handling...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22158 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22158: [SPARK-25161][Core] Fix several bugs in failure h...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22158 [SPARK-25161][Core] Fix several bugs in failure handling of barrier execution mode ## What changes were proposed in this pull request? Fix several bugs in failure handling of barrier execution mode: * Mark TaskSet for a barrier stage as zombie when a task attempt fails; * Multiple barrier task failures from a single barrier stage should not trigger multiple stage retries; * Barrier task failure from a previous failed stage attempt should not trigger stage retry; * Fail the job when a task from a barrier ResultStage failed; * RDD.isBarrier() should not rely on `ShuffleDependency`s. ## How was this patch tested? Added corresponding test cases in `DAGSchedulerSuite` and `TaskSchedulerImplSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark failure Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22158.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22158 commit 32ea946c68c5f3108fb18f7e936ba440f7537144 Author: Xingbo Jiang Date: 2018-08-20T17:19:35Z update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22101 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22101 Thanks @squito I've added another test case to cover when the last byte differs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r211182337 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,124 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, port, secret): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._port = port +cls._secret = secret + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._port is None or self._secret is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +_load_from_socket(self._port, self._secret) + +def getTaskInfos(self): --- End diff -- fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r210963511 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +99,124 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +def _load_from_socket(port, auth_secret): +""" +Load data from a given socket, this is a blocking method thus only return when the socket +connection has been closed. +""" +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +sock = socket.socket(af, socktype, proto) +try: +# Do not allow timeout for socket reading operation. +sock.settimeout(None) +sock.connect(sa) +except socket.error: +sock.close() +sock = None +continue +break +if not sock: +raise Exception("could not open socket") + +sockfile = sock.makefile("rwb", 65536) +do_server_auth(sockfile, auth_secret) + +# The socket will be automatically closed when garbage-collected. +return UTF8Deserializer().loads(sockfile) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_port = None +_secret = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass + +@classmethod +def _getOrCreate(cls): +"""Internal function to get or create global BarrierTaskContext.""" +if cls._taskContext is None: +cls._taskContext = BarrierTaskContext() +return cls._taskContext + +@classmethod +def get(cls): +""" +Return the currently active BarrierTaskContext. This can be called inside of user functions +to access contextual information about running tasks. + +.. note:: Must be called on the worker, not the driver. Returns None if not initialized. +""" +return cls._taskContext + +@classmethod +def _initialize(cls, port, secret): +""" +Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called +after BarrierTaskContext is initialized. +""" +cls._port = port +cls._secret = secret + +def barrier(self): +""" +.. note:: Experimental + +Sets a global barrier and waits until all tasks in this stage hit this barrier. +Note this method is only allowed for a BarrierTaskContext. + +.. versionadded:: 2.4.0 +""" +if self._port is None or self._secret is None: +raise Exception("Not supported to call barrier() before initialize " + +"BarrierTaskContext.") +else: +_load_from_socket(self._port, self._secret) + +def getTaskInfos(self): --- End diff -- This is not available temporarily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r210963181 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -381,6 +421,45 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } } } + + /** + * Gateway to call BarrierTaskContext.barrier(). + */ + def barrierAndServe(): Unit = { --- End diff -- It's not clear yet how to trigger this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() data corr...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22112 > IMO we should traverse the dependency graph and rely on how ShuffledRDD is configured A trivial point here - Since `ShuffleDependency` is also a DeveloperAPI, it's possible for users to write a customized RDD that behaves like `ShuffleRDD`, so we may want to depend on dependencies rather than RDDs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22101 ping @gatorsmile @mridulm @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() da...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210450123 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1441,6 +1441,18 @@ class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + if (!mapStage.rdd.isIdempotent) { +// The map stage is not idempotent, we have to rerun all the tasks for the +// failed stage to get expected result. +failedStage match { + case s: ShuffleMapStage => --- End diff -- We may also have to update the logic in `removeExecutorAndUnregisterOutputs`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() da...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210449640 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1441,6 +1441,18 @@ class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + if (!mapStage.rdd.isIdempotent) { +// The map stage is not idempotent, we have to rerun all the tasks for the +// failed stage to get expected result. +failedStage match { + case s: ShuffleMapStage => --- End diff -- Like we discussed, we shall also retry the partially finished succeeding stages. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21698 Thanks @cloud-fan your summary above is super useful, and I think it's clear enough. > So when we see fetch failure and rerun map tasks, we should track which reducers have its shuffle blocks being rewritten, and rerun them. IIUC, patterns like `rdd.map(...).groupBy()` shall always be under risk if we can generate non-determine output in `map()` right? > Simply inserting a sort before shuffle doesn't help. The fix for dataframe is adding a sort before round-robin, to make it deterministic. If we add the sort after round-robin and before shuffle, the problem still exists. Does this means, if we can generate non-determine output, then we can still loss some data even add a local sort before shuffle, because the reduce tasks may have already finished (or even have committed)? > be more conservative when handling fetch failure and rerun more reduce tasks. We can provide an internal API to tag a RDD as deterministic (very common in Spark SQL) and then we can safely be optimistic when handling fetch failure. This is somehow like what I proposed yesterday, one issue we can't resolve is that some ResultTasks may have committed, in that case it seems the best effort we can make is just fail the job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209974729 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- The major issue here is that we want to make the `barrier()` call blocking, the task shall wait until timeout or succeeded, do we have other ways to achieve this goal other than current approach here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22101 @squito I've created a new JIRA task and updated the title, thanks for reminding! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209853276 --- Diff: python/pyspark/taskcontext.py --- @@ -95,3 +95,92 @@ def getLocalProperty(self, key): Get a local property set upstream in the driver, or None if it is missing. """ return self._localProperties.get(key, None) + + +class BarrierTaskContext(TaskContext): + +""" +.. note:: Experimental + +A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext +for a running task, use: +L{BarrierTaskContext.get()}. + +.. versionadded:: 2.4.0 +""" + +_barrierContext = None + +def __init__(self): +"""Construct a BarrierTaskContext, use get instead""" +pass --- End diff -- This just follows `TaskContext.__init__()`, shall we update both? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryComparator...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22101 cc @mridulm @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22101 [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE. ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/22079#discussion_r209705612 It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue. This PR fixes the issue. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark fix-rbc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22101.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22101 commit 1f6b2594ebe8b50e4cb2fcde15181cfa9a17f48c Author: Xingbo Jiang Date: 2018-08-14T04:04:40Z fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22079: [SPARK-23207][SPARK-22905][SQL][BACKPORT-2.2] Shu...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22079#discussion_r209822194 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.util.collection.unsafe.sort.RecordComparator; + +public final class RecordBinaryComparator extends RecordComparator { + + // TODO(jiangxb) Add test suite for this. + @Override + public int compare( + Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) { +int i = 0; +int res = 0; + +// If the arrays have different length, the longer one is larger. +if (leftLen != rightLen) { + return leftLen - rightLen; +} + +// The following logic uses `leftLen` as the length for both `leftObj` and `rightObj`, since +// we have guaranteed `leftLen` == `rightLen`. + +// check if stars align and we can get both offsets to be aligned +if ((leftOff % 8) == (rightOff % 8)) { + while ((leftOff + i) % 8 != 0 && i < leftLen) { +res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - +(Platform.getByte(rightObj, rightOff + i) & 0xff); +if (res != 0) return res; +i += 1; + } +} +// for architectures that support unaligned accesses, chew it up 8 bytes at a time +if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { + while (i <= leftLen - 8) { +res = (int) ((Platform.getLong(leftObj, leftOff + i) - +Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); +if (res != 0) return res; --- End diff -- As far as I can recall It's actually a mistake. At first the function returned a Long value and later I changed the return value to Integer, let me fix it. Thanks for discovering this @mridulm ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22001 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21698 @tgravescs I'm still working on this but I would be glad if you can also work on the "sort the serialized bytes of T" approach, actually the retry-all-tasks approach seems more complex than I expected when it involves commit protocol (currently a task can be only committed once, so if you already have some tasks committed and then hit a ExecutorLost then retry-all-tasks won't work), so I hope we can have other approaches like "sort the serialized bytes of T" get merged into 2.4 release. I'll post the benchmark result of DF.repartition() fix later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21698 We fixed the DataFrame repartition correctness issue by inserting a local sort before repartition, and feedback for this approach is generally negative because the performance of repartition() goes down significantly, and that even queries that don't have potential correctness issue also have to pay for the performance regression (eg. rdd.repartition(...).map(...).collect()). That's the major reason why we are trying to resolve the correctness issues in a different way. I agree that correctness shall be allocated higher priority than performance, but things are not black or white, here we really care about both. We also want to guarantee that if you don't have correctness issue before hand, you are least affected by the proposed fix approach. I'm currently working on a extended approach based on that proposed in this PR, that shall handle the cascading stages issue @mridulm mentioned above (rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, v._2))).groupByKey().map().save()). Please also note that this actually don't implies we will retry more stages, it's true we will retry more tasks to ensure correctness on FetchFailure/ExecutorLost, but we won't retry more stages. Having said that, IMO the best bet is to implement both approaches(the insert-local-sort one and the retry-all-tasks-on-failure one) and create a flag for each of them, so users may choose a approach based on different workload patterns, though it's also debatable which approach shall be enabled by default. Unfortunately we are not able to deliver them on 2.4, but I'm optimistic we may include them in 3.0 and of course backport them to all the active branches. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22001: [SPARK-24819][CORE] Fail fast when no enough slots to la...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22001 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22085#discussion_r209490553 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a GatewayServer to port current BarrierTaskContext to Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +val secret = if (isBarrier) { + Utils.createSecret(env.conf) +} else { + "" +} +val gatewayServer: Option[GatewayServer] = if (isBarrier) { + Some(new GatewayServer.GatewayServerBuilder() +.entryPoint(context.asInstanceOf[BarrierTaskContext]) +.authToken(secret) +.javaPort(0) +.callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(), + secret) +.build()) --- End diff -- We have to port `BarrierTaskContext` from java to python side, otherwise there is no way to call `BarrierTaskContext.barrier()` from python side. Thus, of course, the JavaGateway is only initiated when the context is a `BarrierTaskContext`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartition on ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22079 Both seems fine to me, it's just a minor improvement. Normally we don't backport a improvement, but since it's a simple and small change I'm confident it is safe to also include the change in a backport PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22085 [SPARK-25095][PySpark] Python support for BarrierTaskContext ## What changes were proposed in this pull request? Add method `barrier()` and `getTaskInfos()` in python TaskContext, these two methods are only allowed for barrier tasks. ## How was this patch tested? TBD You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark python.barrier Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22085.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22085 commit 7b488299709f715d344e5c38956577f31718ab34 Author: Xingbo Jiang Date: 2018-08-12T16:04:20Z implement python barrier taskcontext --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org