[GitHub] spark issue #22995: [SPARK-25998] [CORE] Change TorrentBroadcast to hold wea...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22995 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 I will close the PR and document in our tests to use spark.ui.retainedTasks = 1 (or some very low value) : which will, in effect, be equivalent to this PR - and loose information anyway; but will be specific to the application which choose to do it (and which does not care about the api). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22609: [SPARK-25594] [Core] Avoid maintaining task infor...
Github user mridulm closed the pull request at: https://github.com/apache/spark/pull/22609 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 > I tried that a lot while writing this code, but the disk store is just too slow That is unfortunate ... I was actually hoping that would be a good compromise if expanded api needs to be preserved. > if UI is disabled, any task related metrics were reported as zero: I was incorrect in this regard - they were not zero, but JobProgressListener had stage/job counters which maintained this. This was limited to job and stage level though - keeping the actual overhead fairly minimal and honoring spark.ui.retainedStages/spark.ui.retainedJobs. Given this, current PR would be a functionality regression - since we end up loosing information we would otherwise expose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 @vanzin any reason why liveStore is hardcoded to be in-memory ? Any implications of making it disk backed ? That might be another option to unblock - requires a config change to existing applications; but way better than OOM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 @vanzin , @gengliangwang To clarify earlier behavior - if UI is disabled, any task related metrics were reported as zero: granularity of update was at the stage level. UI is typically disabled when application does not want additional overheads associated with it (users leverage history server to see the application state) : particularly in multi-tenant settings (like thrift server) or very large applications (processing PB's of data). If UI is enabled (default) or if application is not `live` (history server), task events would be processed. Unfortunately, application + config, which used to work before 2.3 are now throwing OOM (typically, the long running tests are run for 7+ days to find any kerberos related issues - here a LR stress test failed within 28 hours) - and large part of the heap was occupied by TaskDataWrapper; making this a regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 This is a regression introduced in 2.3; unfortunately we did not notice it until now. On Tue, Oct 2, 2018 at 4:56 AM Shahid wrote: > Hi @mridulm <https://github.com/mridulm> , Can't we limit the task > information by setting 'spark.ui.retainedTasks' lesser, to avoid OOM? > correct me if I am wrong. > > â > You are receiving this because you were mentioned. > > > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/22609#issuecomment-426245438>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/ABhJlCqwUw8g_7t-xnAHhBxFXc2jrcO_ks5ug1RfgaJpZM4XDhaB> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22609: [SPARK-25594] [Core] Avoid maintaining task information ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22609 +CC @vanzin, @tgravescs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22609: [SPARK-25594] [Core] Avoid maintaining task infor...
GitHub user mridulm opened a pull request: https://github.com/apache/spark/pull/22609 [SPARK-25594] [Core] Avoid maintaining task information when UI is disabled ## What changes were proposed in this pull request? Avoid maintaining task information in live spark application when UI is disabled. For long running application, with large number of tasks, this ended up causing OOM in our tests. ## How was this patch tested? Long running test successfully ran for 34 hours with steady memory, when it used to fail in 28 hours with OOM earlier. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mridulm/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22609.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22609 commit c2f6b4a50ef3693292f600a2d4d7743ea870b96e Author: Mridul Muralidharan Date: 2018-10-02T08:03:41Z SPARK-25594: Avoid maintaining task information when UI is disabled --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r22067 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( +createCombiner = value => null, +mergeValue = (a, b) => a, +mergeCombiners = (a, b) => a) --- End diff -- scratch that - does not matter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220672896 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partitioner preserves partitioning") { +val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() +val initialPartitioner = rdd.partitioner +val distinctRdd = rdd.distinct() +val resultingPartitioner = distinctRdd.partitioner +assert(initialPartitioner === resultingPartitioner) +val distinctRddDifferent = rdd.distinct(5) +val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner +assert(initialPartitioner != distinctRddDifferentPartitioner) +assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) --- End diff -- We could also check if the number of stages is what we expect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r220672579 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( +createCombiner = value => null, +mergeValue = (a, b) => a, +mergeCombiners = (a, b) => a) --- End diff -- nit: clean them ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r218946996 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- yes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r218946895 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- Yes, would use right partitioner in this case --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22433: [SPARK-25442][SQL][K8S] Support STS to run in k8s deploy...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22433 > As this script is common start point for all the resource managers(k8s/yarn/mesos/standalone/local), i guess changing this to fit for all the cases has a value add, instead of doing at each resource manager level. Thoughts? Please note that I am specifically referring only to the need for changing application `name`. The rationale given that `name` should be DNS compliant is a restriction specific to k8s and not spark. Instead of doing one off rename's the right approach would be to handle this name translation such that it will benefit not just STS, but any user application. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22433: [SPARK-25442][SQL][K8S] Support STS to run in k8s deploy...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22433 It is an implementation detail of k8s integration that application name is expected to be DNS compliant ... spark does not have that requirement; and yarn/mesos/standalone/local work without this restriction. The right fix in k8s integration would be to sanitize the name specified by user/application to be compliant with its requirements. This will help not just with thrift server, but any spark application. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22433: [SPARK-25442][SQL][K8S] Support STS to run in k8s deploy...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22433 Does it fail in k8s or does spark k8s code error out ? If former, why not fix ânameâ handling in k8s to replace unsupported characters ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217901185 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- Ah yes, no partitioner specified => use parent's partitioner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217901179 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- Since we are already creating a `MapPartitionsRDD` in distinct, overriding `partitioner` should be trivial. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217876215 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +partitioner match { + case Some(p) if numPartitions == partitions.length => +def key(x: T): (T, Null) = (x, null) +val cleanKey = sc.clean(key _) +val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) +val duplicatesRemoved = keyed.reduceByKey((x, y) => x) --- End diff -- Dont you need to specify the partitioner here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22010#discussion_r217876184 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, -isOrderSensitive: Boolean = false) +isOrderSensitive: Boolean = false, +knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { +if (preservesPartitioning) { + firstParent[T].partitioner +} else { + knownPartitioner +} + } --- End diff -- We dont need to modify public api to add support for this. Create a subclass of MapPartitionsRDD which has partitioner method overridden to specify what you need. Did I miss something here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216118263 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,6 +136,26 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val executorPlugins: Seq[ExecutorPlugin] = { +val pluginNames = conf.get(EXECUTOR_PLUGINS) +if (pluginNames.nonEmpty) { + logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}") + + // Plugins need to load using a class loader that includes the executor's user classpath + val pluginList: Seq[ExecutorPlugin] = +Utils.withContextClassLoader(replClassLoader) { + val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) + plugins.foreach(_.init()) --- End diff -- On second thoughts, I am not sure if the latter should fail the executor ... for example if plugin is unable to write a file, etc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216116857 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,6 +136,26 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val executorPlugins: Seq[ExecutorPlugin] = { +val pluginNames = conf.get(EXECUTOR_PLUGINS) +if (pluginNames.nonEmpty) { + logDebug(s"Initializing the following plugins: ${pluginNames.mkString(", ")}") + + // Plugins need to load using a class loader that includes the executor's user classpath + val pluginList: Seq[ExecutorPlugin] = +Utils.withContextClassLoader(replClassLoader) { + val plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginNames, conf) + plugins.foreach(_.init()) --- End diff -- Just to confirm - the behavior we are going for is, if `init()` fails, then the executor does not come up - right ? This could be invalid plugins specified or even initialization of one or more plugin's failing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r216116706 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -240,6 +240,19 @@ private[spark] object Utils extends Logging { // scalastyle:on classforname } + /** + * Run a segment of code using a different context class loader in the current thread + */ + def withContextClassLoader[T](ctxClassLoader: ClassLoader)(fn: => T): T = { +val oldClassLoader = getContextOrSparkClassLoader --- End diff -- Curious, why `getContextOrSparkClassLoader` and not just `Thread.getContextClassLoader` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r215750952 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -218,6 +244,8 @@ private[spark] class Executor( env.metricsSystem.report() heartbeater.shutdown() heartbeater.awaitTermination(10, TimeUnit.SECONDS) +executorPluginThread.interrupt() +executorPluginThread.join() --- End diff -- We should be careful about semantics of `interrupt()` when overriding it - here we are using it to trigger something in addition to thread interruption. For example, the actual shutdown of plugin's is getting triggered in this caller thread - and not in plugin init thread : which can cause issues since plugin's are using a a different thread ctx classloader. This usage is actually a bit confusing. A better option, if we want to follow current design, would be: * Subclass Thread - expose methods to trigger state changes. * Once init completes, wait+notify/condition wait until plugin shutdown is required. * On stop(), invoke plugin shutdown in plugin thread (so that right classloader is in use). IIRC VM shutdown results in `executor.stop` - and so plugin shutdown should get invoked even in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r215749556 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,6 +136,32 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val pluginList = conf.get(EXECUTOR_PLUGINS) + if (pluginList.nonEmpty) { +logDebug(s"Initializing the following plugins: ${pluginList.mkString(", ")}") + } + + val executorPluginThread = new Thread { +var plugins: Seq[ExecutorPlugin] = Nil + +override def run(): Unit = { + plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) + plugins.foreach(_.init()) +} + +override def interrupt(): Unit = { + plugins.foreach(_.shutdown()) + super.interrupt() +} + } + + executorPluginThread.setContextClassLoader(replClassLoader) + executorPluginThread.start() + + if (pluginList.nonEmpty) { +logDebug("Finished initializing plugins") + } + --- End diff -- We should wait for plugin's to finish initialization (join() on thread) before moving on (particularly if we are emitting a message to indicate the same). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r215749405 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,6 +136,32 @@ private[spark] class Executor( // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. env.serializerManager.setDefaultClassLoader(replClassLoader) + private val pluginList = conf.get(EXECUTOR_PLUGINS) + if (pluginList.nonEmpty) { +logDebug(s"Initializing the following plugins: ${pluginList.mkString(", ")}") + } + + val executorPluginThread = new Thread { +var plugins: Seq[ExecutorPlugin] = Nil + +override def run(): Unit = { + plugins = Utils.loadExtensions(classOf[ExecutorPlugin], pluginList, conf) + plugins.foreach(_.init()) +} + +override def interrupt(): Unit = { + plugins.foreach(_.shutdown()) + super.interrupt() --- End diff -- super.interrupt in try/finally. Also would be good to isolate the shutdown of each plugin from others (so that an exception thrown in one plugin does not prevent another plugin from shutting down) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22192 @vanzin @NiharS I am uncomfortable with that change as well - which is why I wanted the initialization to be pushed into a separate thread (and then join) - if we really need to set the context classloader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213819091 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -130,6 +130,14 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + // Load plugins in the current thread, they are expected to not block. + // Heavy computation in plugin initialization should be done async. + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => --- End diff -- IMO we should do this in a different thread. It will isolate the execution creation/instantiation from executor initialization. Also, we should do this after `Executor` initialization has completed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213818362 --- Diff: core/src/main/java/org/apache/spark/ExecutorPlugin.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. We will + * *hopefully* be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public interface ExecutorPlugin { --- End diff -- I agree, explicit start/stop is a good idea; it also makes the lifecycle more concrete. With the explicit caveat that class loading need not result in instantiation, and object creation need not necessarily result in a start(), or a `stop()` being invoked for every `start()` (in case of VM failures). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22192: [SPARK-24918][Core] Executor Plugin API
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22192#discussion_r213821565 --- Diff: core/src/test/java/org/apache/spark/ExecutorPluginSuite.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +// Tests loading plugins into executors +public class ExecutorPluginSuite { + // Static value modified by testing plugin to ensure plugin loaded correctly. + public static int numSuccessfulPlugins = 0; + private JavaSparkContext sc; + + private String EXECUTOR_PLUGIN_CONF_NAME = "spark.executor.plugins"; + private String testPluginName = "org.apache.spark.ExecutorPluginSuite$TestExecutorPlugin"; + + @Before + public void setUp() { +sc = null; +numSuccessfulPlugins = 0; + } + + private SparkConf initializeSparkConf(String pluginNames) { +return new SparkConf() +.setMaster("local") +.setAppName("test") +.set(EXECUTOR_PLUGIN_CONF_NAME, pluginNames); + } + + @Test + public void testPluginClassDoesNotExist() { +SparkConf conf = initializeSparkConf("nonexistant.plugin"); +try { + sc = new JavaSparkContext(conf); +} catch (Exception e) { + // We cannot catch ClassNotFoundException directly because Java doesn't think it'll be thrown + assertTrue(e.toString().startsWith("java.lang.ClassNotFoundException")); +} finally { + if (sc != null) { +sc.stop(); +sc = null; + } +} + } + + @Test + public void testAddPlugin() throws InterruptedException { +// Load the sample TestExecutorPlugin, which will change the value of pluginExecutionSuccessful +SparkConf conf = initializeSparkConf(testPluginName); + +try { + sc = new JavaSparkContext(conf); +} catch (Exception e) { + fail("Failed to start SparkContext with exception " + e.toString()); +} + +// Wait a moment since plugins run on separate threads +Thread.sleep(500); + +assertEquals(1, numSuccessfulPlugins); --- End diff -- After introducing an explicit start(), we can replace the `sleep` with a timed wait on a condition. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r212772176 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,16 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + // Only update the stage if it has not finished already + if (v1.StageStatus.ACTIVE.equals(stage.status) || + v1.StageStatus.PENDING.equals(stage.status)) { +stage.status = v1.StageStatus.SKIPPED +job.skippedStages += stage.info.stageId +job.skippedTasks += stage.info.numTasks +job.activeStages -= 1 +it.remove() --- End diff -- To clarify, I was referring to 'this' being job end event received before stage end (for a stage which is part of a job). I was not referring to task end event's (those can come in after stage or job end's). Thanks for clarifying @vanzin ... given the snippet is not trying to recover from events drop, wondering why "non"-skipped stages would even be in the list : I would expect all of them to be skipped ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212705976 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1871,57 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's output. Please refer to [[RandomLevel]] for the + * definition. + * + * By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is IDEMPOTENT. For + * RDDs with parents, we will generate a random level candidate per parent according to the + * dependency. The random level of the current RDD is the random level candidate that is random + * most. Please override [[getOutputRandomLevel]] to provide custom logic of calculating output + * random level. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] final lazy val outputRandomLevel: RandomLevel.Value = { +if (checkpointData.exists(_.isInstanceOf[ReliableRDDCheckpointData[_]])) { + RandomLevel.IDEMPOTENT +} else { + getOutputRandomLevel +} + } + + @DeveloperApi + protected def getOutputRandomLevel: RandomLevel.Value = { +val randomLevelCandidates = dependencies.map { + case dep: ShuffleDependency[_, _, _] => --- End diff -- `dep.rdd.partitioner` sorry --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r212705634 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,16 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + // Only update the stage if it has not finished already + if (v1.StageStatus.ACTIVE.equals(stage.status) || + v1.StageStatus.PENDING.equals(stage.status)) { +stage.status = v1.StageStatus.SKIPPED +job.skippedStages += stage.info.stageId +job.skippedTasks += stage.info.numTasks +job.activeStages -= 1 +it.remove() --- End diff -- This can happen when events get dropped ... Spark makes best case effort to deliver events in order; but when events get dropped, UI becomes inconsistent. I assume this might be an effort to recover in that case ? Any thoughts @tgravescs, @vanzin ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r212491921 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,16 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + // Only update the stage if it has not finished already + if (v1.StageStatus.ACTIVE.equals(stage.status) || + v1.StageStatus.PENDING.equals(stage.status)) { +stage.status = v1.StageStatus.SKIPPED +job.skippedStages += stage.info.stageId +job.skippedTasks += stage.info.numTasks +job.activeStages -= 1 +it.remove() --- End diff -- Btw, there is an existing bug that we are not updating pool, etc which we do in onStageCompleted ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r212490964 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,16 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + // Only update the stage if it has not finished already + if (v1.StageStatus.ACTIVE.equals(stage.status) || + v1.StageStatus.PENDING.equals(stage.status)) { +stage.status = v1.StageStatus.SKIPPED +job.skippedStages += stage.info.stageId +job.skippedTasks += stage.info.numTasks +job.activeStages -= 1 +it.remove() --- End diff -- I am not sure I follow - if that is the case, why are we doing this for active stages here ? onStageCompleted/onTaskEnd would be fired for active stages as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the issue where stage p...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r212468772 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,16 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + // Only update the stage if it has not finished already + if (v1.StageStatus.ACTIVE.equals(stage.status) || + v1.StageStatus.PENDING.equals(stage.status)) { +stage.status = v1.StageStatus.SKIPPED +job.skippedStages += stage.info.stageId +job.skippedTasks += stage.info.numTasks +job.activeStages -= 1 +it.remove() --- End diff -- removal from iterator should always happen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212462874 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) +} +} + +def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { +stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) +} + } else { +stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } +} --- End diff -- Something like this sketch was what I meant : ``` def rollbackSucceedingStages(stageChain: List[Stage], alreadyProcessed: Set[Int]): Set[Int] = { var processed = alreadyProcessed val stage = stageChain.head if (stage.id == failedStage.id) { stageChain.foreach { stage => if (!failedStages.contains(stage)) rollBackStage(stage) } } else { stage.parents.foreach(s => if (! processed.contains(s.id)){ processed = rollbackSucceedingStages(s :: stageChain, processed) }) } processed + failedStage.id } ``` (or perhaps with mutable Set to make it simpler ?) This will reduce need to reprocess stages we have already handled in large dag's; where a stage subtree figures out in multiple places in the dag. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212452014 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2627,6 +2632,81 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(countSubmittedMapStageAttempts() === 2) } + test("SPARK-23207: retry all the succeeding stages when the map stage is random") { --- End diff -- We need to add a test where final result stage has not started (yet) but intermediate stages are retried on failure. This will validate that if result stage has not yet started, it wont fail the job --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212451081 --- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala --- @@ -95,6 +99,18 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] rdd2 = null f = null } + + private def isRandomOrder(rdd: RDD[_]): Boolean = { +rdd.computingRandomLevel == RDD.RandomLevel.UNORDERED --- End diff -- `isRandomOrder` gives the impression that both cases (INDETERMINATE and UNORDERED) are 'random'. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21698 @jiangxb1987 I am guessing we should close this PR ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212386645 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { +val parentRandomLevels = dependencies.map { + case dep: ShuffleDependency[_, _, _] => +if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + RDD.RandomLevel.INDETERMINATE --- End diff -- Crap, brain fart ... you are right it is UNORDERED and not INDETERMINATE ... I am still getting my head around the terms :-( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212385688 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { +val parentRandomLevels = dependencies.map { + case dep: ShuffleDependency[_, _, _] => +if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + RDD.RandomLevel.INDETERMINATE --- End diff -- RE: checkpoint. I wanted to handle two cases. * Checkpoint is being done as part of the current job (and not a previous job which forced materialization of checkpoint'ed RDD). * Checkpoint is happening to reliable store, not local - where we are subject to failures on node failures. Looks like `dep.rdd.isCheckpointed` is the wrong way to go about it (relying on `dependencies` is insufficient for both cases). A better option seems to be: ``` // If checkpointed already - then always same order case dep: Dependency if dep.rdd.getCheckpointFile.isDefined => RDD.RandomLevel.IDEMPOTENT ``` > Actually we know. As long as the shuffle map stage RDD is IDEMPOTENT or UNORDERED, the reduce RDD is UNORDERED instead of INDETERMINATE. It does not matter what the output order of map stage was, after we shuffle the map output, it is always indeterminate order except for the specific cases I referred to above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192600 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -855,16 +858,17 @@ abstract class RDD[T: ClassTag]( * a map on the other). */ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { -zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => - new Iterator[(T, U)] { -def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { - case (true, true) => true - case (false, false) => false - case _ => throw new SparkException("Can only zip RDDs with " + -"same number of elements in each partition") +zipPartitionsInternal(other, preservesPartitioning = false, orderSensitiveFunc = true) { + (thisIter, otherIter) => +new Iterator[(T, U)] { + def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { +case (true, true) => true +case (false, false) => false +case _ => throw new SparkException("Can only zip RDDs with " + + "same number of elements in each partition") + } + def next(): (T, U) = (thisIter.next(), otherIter.next()) } -def next(): (T, U) = (thisIter.next(), otherIter.next()) - } --- End diff -- Bulk of the change here is simply indentation right (except for zipPartitions -> zipPartitionsInternal and flag) ? I want to make sure I did not miss something here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212199604 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) --- End diff -- This ends up abort'ing the job if I am not wrong : while we want to retry the stage. +CC @markhamstra, @tgravescs if there is a better way to accomplish result stage rollback. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212195284 --- Diff: core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala --- @@ -95,6 +99,18 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] rdd2 = null f = null } + + private def isRandomOrder(rdd: RDD[_]): Boolean = { +rdd.computingRandomLevel == RDD.RandomLevel.UNORDERED --- End diff -- Instead of this, check if order is not IDEMPOTENT. In both unordered and indeterminate case, we should return indeterminate for this zip rdd from computingRandomLevel. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212199007 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) +} +} + +def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { +stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) +} + } else { +stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } +} + +rollBackStage(failedStage) --- End diff -- This would be implicitly handled anyway, right ? Any reason to specifically do it here ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212193814 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag]( */ object RDD { + /** + * The random level of RDD's computing function, which indicates the behavior when rerun the + * computing function. There are 3 random levels, ordered by the randomness from low to high: + * 1. IDEMPOTENT: The computing function always return the same result with same order when rerun. + * 2. UNORDERED: The computing function returns same data set in potentially a different order + * when rerun. + * 3. INDETERMINATE. The computing function may return totally different result when rerun. + * + * Note that, the output of the computing function usually relies on parent RDDs. When a + * parent RDD's computing function is random, it's very likely this computing function is also + * random. + */ + object RandomLevel extends Enumeration { --- End diff -- While reviewing the PR, this enumeration looks a bit unclear. We are modelling two things here : * Behavior of `compute` in RDD - whether it is idempotent, order sensitive or indeterminate. also, * What is the input order of the tuples coming in from the partition to `RDD.compute` For example, shuffle output of a sorted RDD would be `IDEMPOTENT` by this definition - but idempotent is a functional behavior of a closure, not input order. Perhaps we need to convey this better ? Or does 'idempotent' order make sense ? Thoughts @cloud-fan, @markhamstra ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212198632 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -70,7 +70,8 @@ class MyRDD( numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil, -@(transient @param) tracker: MapOutputTrackerMaster = null) +@(transient @param) tracker: MapOutputTrackerMaster = null, +isRandom: Boolean = false) --- End diff -- isRandom -> unordered --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212197939 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + val errorMessage = "A shuffle map stage with random output was failed and " + +s"retried. However, Spark cannot rollback the result stage $resultStage " + +"to re-process the input data, and has to fail this job. Please " + +"eliminate the randomness by checkpointing the RDD before " + +"repartition/zip and try again." + abortStage(failedStage, errorMessage, None) +} +} + +def rollbackSucceedingStages(stageChain: List[Stage]): Unit = { + if (stageChain.head.id == failedStage.id) { +stageChain.foreach { stage => + if (!failedStages.contains(stage)) rollBackStage(stage) +} + } else { +stageChain.head.parents.foreach(s => rollbackSucceedingStages(s :: stageChain)) + } +} --- End diff -- This method looks expensive for large DAG's, memorization should help reduce the cost. Compute the set of stages to rollback and rollback the stages found. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192065 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1876,6 +1920,22 @@ abstract class RDD[T: ClassTag]( */ object RDD { + /** + * The random level of RDD's computing function, which indicates the behavior when rerun the + * computing function. There are 3 random levels, ordered by the randomness from low to high: + * 1. IDEMPOTENT: The computing function always return the same result with same order when rerun. + * 2. UNORDERED: The computing function returns same data set in potentially a different order + * when rerun. + * 3. INDETERMINATE. The computing function may return totally different result when rerun. + * + * Note that, the output of the computing function usually relies on parent RDDs. When a + * parent RDD's computing function is random, it's very likely this computing function is also + * random. + */ + object RandomLevel extends Enumeration { --- End diff -- RandomLevel is not very descriptive; particularly after rename of the levels to UNORDERED/INDETERMINATE. OrderSensitivity or something else more descriptive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192772 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { --- End diff -- We will need to expose this with `@Experimental` tag - cant keep it `private[spark]` given the implications for custom RDD's. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212193206 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag]( // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) + + /** + * Returns the random level of this RDD's computing function. Please refer to [[RDD.RandomLevel]] + * for the definition of random level. + * + * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs with parents, the random + * level of current RDD is the random level of the parent which is random most. + */ + // TODO: make it public so users can set random level to their custom RDDs. + // TODO: this can be per-partition. e.g. UnionRDD can have different random level for different + // partitions. + private[spark] def computingRandomLevel: RDD.RandomLevel.Value = { +val parentRandomLevels = dependencies.map { + case dep: ShuffleDependency[_, _, _] => +if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { + RDD.RandomLevel.INDETERMINATE --- End diff -- It does not matter what the parent RDD's order was - after shuffle, currently, it is going to be always UNORDERED - unless Aggregator and key ordering is specified in dep. Given [this comment](https://github.com/apache/spark/pull/22112#issuecomment-414034703), and adding a few missing cases, this becomes: ``` // If checkpointed already - then always same order case dep: Dependency if dep.rdd.isCheckpointed => RDD.RandomLevel.IDEMPOTENT // if same partitioner, then shuffle not done. case dep: ShuffleDependency[_, _, _] if dep.partitioner == partitioner => dep.rdd.computingRandomLevel // if aggregator specified (and so unique keys) and key ordering specified - then consistent ordering. case dep: ShuffleDependency[_, _, _] if dep.keyOrdering.isDefined && dep.aggregator.isDefined => RDD.RandomLevel.IDEMPOTENT // All other shuffle cases, we dont know the output order in spark. case dep: ShuffleDependency[_, _, _] => RDD.RandomLevel.INDETERMINATE ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212192261 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -54,4 +58,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( @transient protected lazy override val isBarrier_ : Boolean = isFromBarrier || dependencies.exists(_.rdd.isBarrier()) + + override private[spark] def computingRandomLevel = { --- End diff -- computingRandomLevel -> computeOrderSensitivity (suffix to match what `RandomLevel` gets renamed to) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212190267 --- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala --- @@ -32,12 +32,16 @@ import org.apache.spark.{Partition, TaskContext} * doesn't modify the keys. * @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage * containing at least one RDDBarrier shall be turned into a barrier stage. + * @param orderSensitiveFunc whether or not the zip function is order-sensitive. If it's order --- End diff -- remove 'zip' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r212196598 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1502,6 +1502,53 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be idempotent, so the input data of the reducers will not change even + // if the map tasks are re-tried. + if (mapStage.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +val numMissingPartitions = mapStage.findMissingPartitions().length +if (numMissingPartitions < mapStage.numTasks) { + markStageAsFinished( +mapStage, +Some("preceding shuffle map stage with random output gets retried."), +willRetry = true) + mapOutputTracker.unregisterAllMapOutput(mapStage.shuffleDep.shuffleId) + failedStages += mapStage +} + + case resultStage => +val numMissingPartitions = resultStage.findMissingPartitions().length +if (numMissingPartitions < resultStage.numTasks) { --- End diff -- IIRC this can be a valid case - for example if result stage is being run on only a subset of partitions (first() for example) : so number of missing partitions can be legitimately > numTasks and still have missing relevant partitions. I might be a bit rusty here though, +CC @markhamstra --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 @tgravescs: > The shuffle simply transfers the bytes its supposed to. Sparks shuffle of those bytes is not consistent in that the order it fetches from can change and without the sort happening on that data the order can be different on rerun. I guess maybe you mean the ShuffledRDD as a whole or do you mean something else here? By shuffle, I am referring to the output of shuffle which is be consumed by RDD with `ShuffleDependency` as input. More specifically, the output of `SparkEnv.get.shuffleManager.getReader(...).read()` which RDD (user and spark impl's) uses to fetch output of shuffle machinery. This output will not just be shuffle bytes/deserialize, but with aggregation applied (if specified) and ordering imposed (if specified). ShuffledRDD is one such usage within spark core, but others exist within spark core and in user code. > All I'm saying is zip is just another variant of this, you could document it as such and do nothing internal to spark to "fix it". I agree; repartition + shuffle, zip, sample, mllib usages are all variants of the same problem - of shuffle output order being inconsistent. > I guess we can separate out these 2 discussions. I think the point of this pr is to temporarily workaround the data loss/corruption issue with repartition by failing. So if everyone agrees on that lets move the discussion to a jira about what to do with the rest of the operators and fix repartition here. thoughts? Sounds good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 Catching up on discussion ... @cloud-fan > shuffled RDD will never be deterministic unless the shuffle key is the entire record and key ordering is specified. Let me rephrase that - key ordering with aggregator specified. Unfortunately this will then mean it is applicable only to custom user code - since default spark api's do not set both. > The reduce task fetches multiple remote shuffle blocks at the same time, so the order is always random. This is not a characteristics of shuffle in MR based systems, but an implementation detail of shuffle in spark. In hadoop mapreduce, for example, shuffle output is always ordered and this problem does not occur. > In Addition, Spark SQL never specifies key ordering. Spark SQL has re-implemented a lot of the spark core primitives - given this, I would expect spark sql to : * When there is a rdd view gets generated off a dataframe, a local sort be introduced where appropriate - as has already been done in SPARK-23207 for repartition case. and/or * appropriately expose IDEMPOTENT, UNORDERED and INDETERMINATE in RDD view. @tgravescs > I don't agree that " We actually cannot support random output". Users can do this now in MR and spark and we can't really stop them other then say we don't support and if you do failure handling will cause different results. What I mentioned was not specific to spark, but general to any MR like system. This applies even in hadoop mapreduce and used to be a bug in some of our pig udf's :-) For example, if there is random output generated in mapper and there are node failures during reducer phase (after all mapper's have completed), the exact same problem would occur with random mapper output. We cannot, ofcourse, stop users from doing it - but we do not guarantee correct results (just as hadoop mapreduce does not in this scenario). > I don't want us to document it away now and then change our mind in next release. Our end decision should be final. My current thought is as follows: Without making shuffle output order repeatable, we do not have a way to properly fix this. My understanding from @jiangxb1987, who has looked at it in detail with @sameeragarwal and others, is that this is a very difficult invariant to achieve in current spark codebase for shuffle in general. (Please holler if I am off base @jiangxb1987 !) With the assumption that we cannot currently fix this - explicitly warn'ing user and/or reschedule all tasks/stages for correctness might be a good stop gap. User's could mitigate the performance impact via checkpoint'ing [1] - I would expect this to be the go-to solution; for any non trivial job, the perf characteristics and SLA violations are going to be terrible after this patch is applied when failures occur : but we should not have any data loss. In future, we might resolve this issue in a more principled manner. [1] As @cloud-fan's pointed out [here|https://github.com/apache/spark/pull/22112#issuecomment-414034703] sort is not gaurantee'ed to work - unless key's are unique : since ordering is defined only on key and not value (and so value re-order can occur). @cloud-fan > This is the problem we are resolving here. This assumption is incorrect, and the RDD closure should handle it, or use what I proposed in this PR: the retry strategy. I would disagree with this - this is an artifact of implementation detail of spark shuffle - and is not the expected behavior for a MR based system. Unfortunately, this has been the behavior since beginning IMO (atleast since 0.6) IMO this was not a conscious design choice, but rather an oversight. > IIRC @mridulm didn't agree with it. One problem is that, it's hard for users to realize that Spark returns wrong result, so they don't know when to handle it. Actually I would expect user's to end up doing either of these two - the perf characteristics and lack of predictability in SLA after this patch are going to force users to choose one of the two. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 @tgravescs Please see https://github.com/apache/spark/pull/22112#discussion_r210788359 for a further elaboration. We actually cannot support random order (except for small subset of cases like map-only jobs for example). Ideally, I would like to see order sensitive closure's fixed - and fixing repartition + shuffle would fix this for a general case for all order sensitive closures. This PR is not fixing the problem, but rather failing and re-trying the job as a workaround - which, as you mention, can be terribly expensive for large jobs. Ofcourse, data correctness trump's performance, so I am fine with this as stop-gap. I would expect most non trivial application's will simply workaround this by checkpoint'ing to hdfs like what we did in YST. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210963665 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1864,6 +1877,22 @@ abstract class RDD[T: ClassTag]( // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) + + /** + * Whether the RDD's computing function is idempotent. Idempotent means the computing function + * not only satisfies the requirement, but also produce the same output sequence(the output order + * can't vary) given the same input sequence. Spark assumes all the RDDs are idempotent, except + * for the shuffle RDD and RDDs derived from non-idempotent RDD. + */ --- End diff -- This will mean all rdd's which are directly or indirectly reading from an unsorted shuffle output are not 'idempotent'. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210963213 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -853,6 +861,11 @@ abstract class RDD[T: ClassTag]( * second element in each RDD, etc. Assumes that the two RDDs have the *same number of * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). + * + * Note that, `zip` violates the requirement of the RDD computing function. If the order of input + * data changes, `zip` will return different result. Because of this, Spark may return unexpected + * result if there is a shuffle after `zip`, and the shuffle failed and retried. To workaround + * this, users can call `zipPartitions` and sort the input data before zip. --- End diff -- All zip method are affected by it, not just this one. I added a list of other methods I have used from memory (though unfortunately it is not exhaustive) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210967814 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1441,6 +1441,44 @@ class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is not idempotent(produces data in a different order when retry) + // and the shuffle partitioner is order sensitive, we have to retry all the tasks of + // the failed stage and its succeeding stages, because the input data of the failed + // stage will be changed after the map tasks are re-tried. + if (!mapStage.rdd.isIdempotent && mapStage.shuffleDep.orderSensitivePartitioner) { +def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => +if (mapStage.findMissingPartitions().length < mapStage.numPartitions) { --- End diff -- This is making an assumption that all partitions of a stage are getting computed - which is not necessarily true in a general case (see numTasks vs numPartitions). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210964794 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1864,6 +1877,22 @@ abstract class RDD[T: ClassTag]( // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long // RDD chain. @transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) + + /** + * Whether the RDD's computing function is idempotent. Idempotent means the computing function + * not only satisfies the requirement, but also produce the same output sequence(the output order + * can't vary) given the same input sequence. Spark assumes all the RDDs are idempotent, except + * for the shuffle RDD and RDDs derived from non-idempotent RDD. + */ + // TODO: Add public APIs to allow users to mark their RDD as non-idempotent. + // TODO: this can be per-partition. e.g. UnionRDD can have part of its partitions idempotent. + private[spark] def isIdempotent: Boolean = { +dependencies.forall { dep => + // Shuffle RDD is always considered as non-idempotent, because its computing function needs + // to fetch remote shuffle blocks, and these fetched blocks may arrive in a random order. + !dep.isInstanceOf[ShuffleDependency[_, _, _]] && dep.rdd.isIdempotent --- End diff -- This is too strict. As I discussed with @jiangxb1987 , something like this would be better: ``` dep => dep match { case shuffleDep: ShuffleDependency[_, _, _] => shuffleDep.keyOrdering.isDefined // IIRC this is not comprehensive if checkpoint is happening as part of this job. case checkpointedDep: Dependency[_] if checkpointedDep.rdd.isCheckpointed => true case _ => dep.rdd.isIdempotent } ``` Note that this method can end up with stack overflow error's - please refer to `DAGScheduler.stageDependsOn` which does a similar dependency traveral (but for different purpose). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210788359 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -112,6 +112,11 @@ abstract class RDD[T: ClassTag]( /** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. + * + * Spark requires the computing function to always output the same data set(the order can vary) + * given the same input data set. For example, a computing function that increases each input + * value by 1 is valid, a computing function that increases each input by a random value is + * invalid. --- End diff -- "a computing function that increases each input by a random value is invalid." is too stringent a restriction imo and not in line with the preceding requirement of essentially generating same output given same input data iterator. As I mentioned before, in typical MR based systems there are three types of closures in general: * Truely random output - this is something MR systems do not support well except in some very specific cases: primarily due to repeatability of computation not being guaranteed. * I am not considering pseudo random repeatable computations here. * order sensitive output - task output is dependent on both input order and computation. * In spark core we have: * coalesce with shuffle (repartition uses this) * \*sample\* * zip* * random* * glom, take, etc. * I am ignoring the \*approx\* variants - since they are approximation's by definition. * There should be a lot of usage within mllib (atleast yahoo's internal BigML library did have a lot of it). * I vaguely remember a bunch of usages in graphx when I had last checked it a few years back. * I dont have much context into sql, but you should know about that better :-) * record dependent - output for a single record depends only on current input record. * per record filter, map, etc. * A variant of this is where the entire partition is processed by closure - without regard to order : that is, the input iterator is consumed in mapPartitions, but closure is agnostic to tuple order (count() in spark core is a trivial example). Order sensitive output is supported by hadoop mapreduce systems due to the sort which is done as part of shuffle (all keys are comparable) - but in spark, this requires specifying keyOrdering to explicitly enable shuffle output sort (as part of global sort or per partition sort). This allows spark to avoid the cost when it is not applicable (if shuffle map task is only record dependent) : but in case of order sensitive input's, this assumption breaks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22112: [SPARK-23243][Core] Fix RDD.repartition() data co...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210756079 --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala --- @@ -94,6 +94,16 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( shuffleId, _rdd.partitions.length, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + + /** + * whether the partitioner is order sensitive to the input data and will partition shuffle output + * differently if the input data order changes. For example, hash and range partitioners are + * order insensitive, round-robin partitioner is order sensitive. This is a property of + * `ShuffleDependency` instead of `Partitioner`, because it's common that a map task partitions + * its output by itself and use a dummy partitioner later. + */ + // This is defined as a `var` here instead of the constructor, to pass the mima check. + private[spark] var orderSensitivePartitioner: Boolean = false --- End diff -- Output order is orthogonal to what the partitioner is - but enforced by whether `keyOrdering` is defined in shuffle dependency. We should not associate order sensitivity to partitioner (which has no influence on order). > "For example, hash and range partitioners are order insensitive, round-robin partitioner is order sensitive". There is no round robin partitioner in spark core. Similarly > "... because it's common that a map task partitions its output by itself and use a dummy partitioner later." Tasks do not partition - that is the responsibility of partitioner. There can be implementations where tasks and partitioner work in tandem - but that is an impl detail of user code (`distributePartition` is essentially user code which happens to be bundled as part of spark). In general, something like this would suffice here (whether rdd is ordered or not): `def isOrdered = keyOrdering.isDefined` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 @tgravescs To understand better, are you suggesting that we do not support any api and/or user closure which depends on input order ? If yes, that would break not just repartition + shuffle, but also other publically exposed api in spark core and (my guess) non trivial aspects of mllib. Or is it that we support repartition and possibly a few other high priority cases (sampling in mllib for example ?) and not support the rest ? My (unproven) contention is that solution for repartition + shuffle would be a general solution (or very close to it) : which will then work for all other cases with suitable modifications as required. By "expand solution to cover all later.", I was referring to these changes to leverage whatever we build for repartition in other usecases- for example set appropriate parameters, etc in interest of time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 @tgravescs I was specifically in agreement with > Personally I don't want to talk about implementation until we decide what we want our semantics to be around the unordered operations because that affects any implementation. and > I would propose we fix the things that are using the round robin type partitioning (repartition) but then unordered things like zip/MapPartitions (via user code) we document or perhaps give the user the option to sort. IMO a fix in spark core for repartition should work for most (if not all) order dependent closures - we might choose not to implement for others due to time constraints; but basic idea should be fairly similar. Given this, I am fine with documenting the potential issue for others and fix for a core subset - with assumption that we will expand solution to cover all later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 I agree @tgravescs, I was looking at the implementation to understand what the expectations are wrt newly introduced methods/fields and whether they make sense : I did not see any details furnished. I donât think we can hack our way out of this. I would expect a solution for repartition to also be applicable to other order dependent closures as well - though we might choose to fix them later, the basic approach ideally should be transferable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() data corr...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 You are perfectly correct @jiangxb1987, that was a silly mistake on my part - and not trivial at all ! It should be shuffle dependency we should rely on when traversing the dependency tree, not shuffledrdd. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [WIP][SPARK-23243][Core] Fix RDD.repartition() data corr...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22112 I am not sure what the definition of `isIdempotent` here is. For example, from MapPartitionsRDD : ``` override private[spark] def isIdempotent = { if (inputOrderSensitive) { prev.isIdempotent } else { true } } ``` Consider: `val rdd1 = rdd.groupBy().map(...).repartition(...).filter(...)`. By definition above, this would make rdd1 idempotent. Depending on what the definition of idempotent is (partition level, record level, etc) - this can be correct or wrong code. Similarly, I am not sure why idempotency or ordering is depending on `Partitioner`. IMO we should traverse the dependency graph and rely on how `ShuffledRDD` is configured - whether there is a key ordering specified (applies to both global sort and per partition sort), whether it is from a checkpoint or marked for checkpoint, whether it is from a stable input source, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22101: [SPARK-25114][Core] Fix RecordBinaryComparator when subt...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/22101 LGTM pending Xiao Li's excellent suggestion :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21698 > I guess on the RDD side its not called RoundRobinPartitioner Thanks for clarifying @tgravescs ! I was looking at `RangePartitioner` and variants and was wondering what I was missing - did not make the obvious connection with sql :-) > If we can't come up with another solution, I would actually be ok with failing short term, its better then corruption If I understand correctly, the proposal is * In `ShuffledRDD`, add a flag `orderSensitiveReducer` (?) - to track specific patterns identified which is order sensitive (repartition with shuffle = true, zip, etc). * If a task is getting re-executed as part of stage re-execution, if the flag is true, fail job. * Task re-execution as part of same stage, speculative execution, etc should not be an issue - since only one task completes. * ResultStage should not be affected. * I am unsure about how cache'ing data interacts here - might need some investigation. This looks like a reasonable stop gap until we fix the issue. It also allows for users to make progress by inserting a checkpoint before the order sensitive closure to unblock them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21698 @cloud-fan I think we have to be clear on the boundaries of the solution we can provide in spark. > RDD#mapPartitions and its friends can take arbitrary user functions, which may produce random result As stated above, this is something we do not support in spark. Spark, MR, etc assume that computation is idempotent - we do not support non determinism in computation : but computation could be sensitive to input order. For a given input partition (iterator of tuples), the closure must always generate the same output partition (iterator of tuples). Which is why 'randomness' (or rather pseudo-randomness) is seeded based using invariants like partition id which result in same output partition on task re-execution. The problem we have here is : even if user code satisfies this constraint, due to non determinism in input order, the output changes when closure is order sensitive. Given this, analyzing the statement below : > Step 3 is problematic: assuming we have 5 map tasks and 5 reduce tasks, and the input data is random. Let's say reduce task 1,2,3,4 are finished, reduce task 5 failed with FetchFailed, and Spark rerun map task 3 and 4. Map task 3 and 4 reprocess the random data and create diffrent shuffle blocks for reduce task 3, 4, 5. So not only reduce task 5 needs to rerun, reduce task 3, 4, 5 all need to rerun, because their input data changed. Here - map task 3 and 4 will always produce the same output partition for supported closures - if the input partition remains same. When reading off checkpoint's, immutable hdfs files, etc - this invariant is satisfied. With @squito's suggestion implemented, this invariant will be satisfied for shuffle input as well. With deterministic input partition - we can see that output of map task 3 and 4 will always be the same - and reduce task input's for 3/4/5 will be the same. So only reduce task 5 will need to be rerun and none of the other input's will change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22101#discussion_r209862610 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -42,16 +42,16 @@ public int compare( while ((leftOff + i) % 8 != 0 && i < leftLen) { res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +if (res != 0) return (int) res; i += 1; } } // for architectures that support unaligned accesses, chew it up 8 bytes at a time if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { -res = (int) ((Platform.getLong(leftObj, leftOff + i) - -Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); -if (res != 0) return res; +res = Platform.getLong(leftObj, leftOff + i) - +Platform.getLong(rightObj, rightOff + i); +if (res != 0) return res > 0 ? 1 : -1; --- End diff -- The subtraction is buggy due to potential overflow. Why not simply use: ``` final long v1 = Platform.getLong(leftObj, leftOff + i); final long v2 = Platform.getLong(rightObj, rightOff + i); if (v1 != v2) { return v1 > v2 ? -1 : 1; } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22101: [SPARK-23207][Core][FOLLOWUP] Fix RecordBinaryCom...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22101#discussion_r209860397 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -42,16 +42,16 @@ public int compare( while ((leftOff + i) % 8 != 0 && i < leftLen) { res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - (Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +if (res != 0) return (int) res; --- End diff -- We can restrict scope of 'res' as an 'int' : and avoid the type promotions/conversions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21698 @squito @tgravescs I am probably missing something about why hash partitioner helps, can you please clarify ? IIRC the partitioner for CoalescedRDD when shuffle is enabled is HashPartitioner ... the issue is the `distributePartition` before the shuffle which is order sensitive but is not deterministic since its input is not deterministic if it is derived from one or more shuffle output's. Btw, when shuffle = false, it does not suffer from the problem - mentally I had assumed that had an issue too - on a recheck now, I find it interesting that it does not (I never used that, so had never checked in detail !) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21698: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21698 @tgravescs I vaguely remember someone at y! labs telling me (more than a decade back) about MR always doing a sort as part of its shuffle to avoid a variant of this problem by design. Essentially it boils down to Imran's suggestion even for arbitrary byte writable's [1], [2] ... [1] https://hadoop.apache.org/docs/r0.23.11/api/src-html/org/apache/hadoop/io/BytesWritable.html [2] https://hadoop.apache.org/docs/r0.23.11/api/src-html/org/apache/hadoop/io/WritableComparator.html#line.154 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22079: [SPARK-23207][SQL][BACKPORT-2.2] Shuffle+Repartit...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22079#discussion_r209705612 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java --- @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution; + +import org.apache.spark.unsafe.Platform; +import org.apache.spark.util.collection.unsafe.sort.RecordComparator; + +public final class RecordBinaryComparator extends RecordComparator { + + // TODO(jiangxb) Add test suite for this. + @Override + public int compare( + Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) { +int i = 0; +int res = 0; + +// If the arrays have different length, the longer one is larger. +if (leftLen != rightLen) { + return leftLen - rightLen; +} + +// The following logic uses `leftLen` as the length for both `leftObj` and `rightObj`, since +// we have guaranteed `leftLen` == `rightLen`. + +// check if stars align and we can get both offsets to be aligned +if ((leftOff % 8) == (rightOff % 8)) { + while ((leftOff + i) % 8 != 0 && i < leftLen) { +res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - +(Platform.getByte(rightObj, rightOff + i) & 0xff); +if (res != 0) return res; +i += 1; + } +} +// for architectures that support unaligned accesses, chew it up 8 bytes at a time +if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { + while (i <= leftLen - 8) { +res = (int) ((Platform.getLong(leftObj, leftOff + i) - +Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); +if (res != 0) return res; --- End diff -- It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue. Any particular reason why this code is written like this with the inaccuracy and not simply the idiomatic comparator comparison ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21895 I merged to master, thanks for the work @mgaido91 ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21895#discussion_r207641341 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } +// Clean the blacklist from the expired entries. +clearBlacklist(CLEAN_INTERVAL_S) --- End diff -- I misread it as MAX_LOG_AGE_S ... CLEAN_INTERVAL_S should be fine here, you are right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21895#discussion_r207493280 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -161,6 +162,29 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + private val blacklist = new ConcurrentHashMap[String, Long] + + // Visible for testing + private[history] def isBlacklisted(path: Path): Boolean = { +blacklist.containsKey(path.getName) + } + + private def blacklist(path: Path): Unit = { +blacklist.put(path.getName, clock.getTimeMillis()) + } + + /** + * Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`. + */ + private def clearBlacklist(expireTimeInSeconds: Long): Unit = { +val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000 +val expired = new mutable.ArrayBuffer[String] +blacklist.asScala.foreach { + case (path, creationTime) if creationTime < expiredThreshold => expired += path +} +expired.foreach(blacklist.remove(_)) --- End diff -- Instead of this, why not simply: ``` blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21895#discussion_r207493081 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -779,6 +808,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } +// Clean the blacklist from the expired entries. +clearBlacklist(CLEAN_INTERVAL_S) --- End diff -- My only concern is that, if there happens to be a transient acl issue when initially accessing the file, we will never see it in the application list even when acl is fixed : without a SHS restart. Wondering if the clean interval here could be fraction of CLEAN_INTERVAL_S - so that these files have a chance of making it to app list : without much of an overhead on NN. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21895 +CC @jerryshao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21895: [SPARK-24948][SHS] Delegate check access permissi...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21895#discussion_r205948923 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -973,6 +973,38 @@ private[history] object FsHistoryProvider { private[history] val CURRENT_LISTING_VERSION = 1L } +private[history] trait CachedFileSystemHelper extends Logging { + protected def fs: FileSystem + + /** + * Cache containing the result for the already checked files. + */ + // Visible for testing. + private[history] val cache = new mutable.HashMap[String, Boolean] --- End diff -- For long running history server in busy clusters (particularly where `spark.history.fs.cleaner.maxAge` is configured to be low), this Map will cause OOM. Either an LRU cache or a disk backed map with periodic cleanup (based on maxAge) might be better ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21653: [SPARK-13343] speculative tasks that didn't commi...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21653#discussion_r204199580 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -723,6 +723,21 @@ private[spark] class TaskSetManager( def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index +// Check if any other attempt succeeded before this and this attempt has not been handled +if (successful(index) && killedByOtherAttempt.contains(tid)) { + calculatedTasks -= 1 + + val resultSizeAcc = result.accumUpdates.find(a => +a.name == Some(InternalAccumulator.RESULT_SIZE)) + if (resultSizeAcc.isDefined) { +totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value --- End diff -- I agree, I dont see a better option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21589 @MaxGekk We are going in circles. I dont think this is a good api to expose currently - the data is available through multiple other means as I detailed and while not a succinct oneliner, it is useable. Not to mention @markhamstra's comment. Unless there is some other compelling reason for introducing this which I have missed; I am -1 on introducing this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21589 @MaxGekk The example you cites is literally one of a handful of usages which is not easily overridden - and is prefixed with a 'HACK ALERT' ! A few others are in mllib, typically for reading schema. I will reiterate the solutions available to users currently: * Rely on `defaultParallelism` - this gives the expected result, unless explicitly overridden by user. * If you need fine grained information about executors, use spark listener (it is trivial to keep a count with `onExecutorAdded`/`onExecutorRemoved`). * If you simply want a current value without own listener - use REST api to query for current executors. Having said this, I will caution against this approach if you are concerned about performance. `defaultParallelism` exists to give a default when user does not explicitly override when creating an `RDD` : and reflects the current number of executors. Particularly when dynamic resource allocation is enabled, this value is not optimal : spark will acquire or release resources based on pending tasks. Using available cluster resources (from cluster manager - not spark) as a way to model parallelism would be a better approach : externalize your config's and populate based on resources available to application (in your example: difference between test/staging/production). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203489913 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- I am completely sold on the idea of enum-like. My main concern was around avoiding `MatchError`'s in scala and the other potential failures you elaborated on above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21589 +CC @markhamstra since you were looking at API stability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21589 I am not convinced by the rationale given for adding the new api's in the jira. The examples given there can be easily modeled using `defaultParallelism` (to get current state) and executor events (to get numCores, memory per executor). For example: `df.repartition(5 * sc.defaultParallelism)` The other argument seems to be that users can override this value and set it to a static constant. User's are not expected to override it unless they want fine grained control over the value and spark is expected to honor it when specified. One thing to be kept in mind is that dynamic resource allocation will kick in after tasks are submitted (when there are insufficient resources available) - so trying to fine tune this for an application, in presence of DRA, uses these api's is not going to be effective anyway. If there are corner cases where `defaultParallelism` is not accurate, we should fix those to reflect the current value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203322643 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- For accuracy sake - my example snippet above will fail much earlier - due to OpenHashSet. MAX_CAPACITY. Though that is probably not the point anyway :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203322056 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- There is no explicitly entry here - it is simply unoccupied slots in an array. The slot is free, it can be used by some other (new) entry when insert is called. It must be trivial to see how very bad behavior can happen with actual size of set being very small - with a series of add/remove's : resulting in unending growth of the set. something like this, for example, is enough to cause set to blow to 2B entries: ``` var i = 0 while (i < Int.MaxValue) { set.add(1) set.remove(1) assert (0 == set.size) i += 1 } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21758: [SPARK-24795][CORE] Implement barrier execution mode
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21758 I had left a few comments on SPARK-24375 @jiangxb1987 ... unfortunately the jira's have moved around a bit. If this is active PR for introducing the feature, would be great to get clarity on them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r203319952 --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala --- @@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorUpdates executor level metrics updates */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, -accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) +accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], +executorUpdates: Option[Array[Long]] = None) + extends SparkListenerEvent + +/** + * Peak metric values for the executor for the stage, written to the history log at stage + * completion. + * @param execId executor id + * @param stageId stage id + * @param stageAttemptId stage attempt + * @param executorMetrics executor level metrics, indexed by MetricGetter.values + */ +@DeveloperApi +case class SparkListenerStageExecutorMetrics( +execId: String, +stageId: Int, +stageAttemptId: Int, +executorMetrics: Array[Long]) --- End diff -- +1 on enum's @squito ! The only concern would be evolving the enum's in a later release - changing enum could result in source incompatibility. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21729: [SPARK-24755][Core] Executor loss can cause task to not ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21729 Looks good to me, thanks for fixing this @hthuynh2 ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/21589 I am not seeing the utility of these two methods. `defaultParallelism` already captures the current number of cores. For monitoring usecases, existing events fired via listener can be used to keep track of current executor population (if that is the intended usecase). Given that this is duplicating information already exposed, I am not very keen on adding additional api. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203311755 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _capacity = nextPowerOf2(initialCapacity) protected var _mask = _capacity - 1 protected var _size = 0 + protected var _occupied = 0 protected var _growThreshold = (loadFactor * _capacity).toInt + def g: Int = _growThreshold + def o: Int = _occupied protected var _bitset = new BitSet(_capacity) + protected var _bitsetDeleted: BitSet = null --- End diff -- Why protected ? Make it private instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203314045 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- I dont see any value in _occupied - on contrary it can cause very bad behavior if there is a lot of remove's expected. `_size` is a better metric to decide to rehash and grow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203311109 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _capacity = nextPowerOf2(initialCapacity) protected var _mask = _capacity - 1 protected var _size = 0 + protected var _occupied = 0 protected var _growThreshold = (loadFactor * _capacity).toInt + def g: Int = _growThreshold + def o: Int = _occupied --- End diff -- Also, please use more descriptive and comprehensible names --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org