[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/7021 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-121071475 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-120117177 As discussed I have opened a patch #7279 that refactors all of this. After that one is merged I'll fix SPARK-8582 in the new refactored code based on the changes here. @viirya would you mind closing this then? Thanks for your time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33995655 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -114,7 +145,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed --- End diff -- The state transition here is incorrect. At this point the RDD has not been checkpointed yet. It's not safe to truncate the RDD's lineage until we drain the iterator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-119031150 Ah, one thing we could do is the following: in `doCheckpoint`, we check if the iterator still has values. If it does, then just keep calling `next` until it is fully drained. This ensures the RDD will always be fully checkpointed after an action. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33995579 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -66,6 +88,27 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) cpFile } + // Get the iterator used to write checkpoint data to HDFS + def getCheckpointIterator( + rddIterator: Iterator[T], + context: TaskContext, + partitionId: Int): Iterator[T] = { +RDDCheckpointData.synchronized { + if (cpState == Initialized) { +// Create the output path for the checkpoint +val path = new Path(checkpointDir.get, rdd- + rddId) +CheckpointingIterator[T]( + rddIterator, + path.toString, + broadcastedConf, + partitionId, + context) + } else { +rddIterator --- End diff -- This seems like an error condition. If we are not in initialized, then we shouldn't be doing this. I would throw an exception here instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-119030716 @viirya Thanks for the tackling this issue, but I believe the existing implementation is not fully correct. There are two high level problems: First, if the checkpointing iterator is not fully consumed by the user, then we end up checkpointing only a subset of the computed data. I think we should ensure that the iterator is fully drained before we can safely truncate the RDD's lineage through `rdd.markCheckpointed`. Second, the state transition from `Initialized` - `CheckpointingInProgress` - `Checkpointed` is not respected. In the new model, we should transition into `CheckpointingInProgress` as soon as the iterator is returned (so multiple calls to it will not lead to the RDD being checkpointed many times). Then only after we fully iterate through the iterator can we declare the RDD as `Checkpointed`. I actually don't have a great idea on how to fix the first issue, however. We do not really have any visibility on how the higher level caller with use the iterator, and if we consume it eagerly ourselves then the application might fail. @tdas this seems like a fundamentally difficult problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970605 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -66,6 +88,27 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) cpFile } + // Get the iterator used to write checkpoint data to HDFS + def getCheckpointIterator( + rddIterator: Iterator[T], + context: TaskContext, + partitionId: Int): Iterator[T] = { +RDDCheckpointData.synchronized { + if (cpState == Initialized) { +// Create the output path for the checkpoint +val path = new Path(checkpointDir.get, rdd- + rddId) --- End diff -- shouldn't this read from `RDDCheckpointData.rddCheckpointDataPath`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970908 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -44,6 +46,26 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) import CheckpointState._ + // Because SparkContext is transient in RDD, so we can't get the id and checkpointDir later. + // So keep a copy of the id and checkpointDir. + // The id of RDD + val rddId: Int = rdd.id + + // The path the checkpoint data will write to. + val checkpointDir = rdd.context.checkpointDir + @transient var checkpointPath: Path = null + @transient var fs: FileSystem = null + if (checkpointDir.isDefined) { +checkpointPath = new Path(checkpointDir.get, rdd- + rddId) +fs = checkpointPath.getFileSystem(rdd.context.hadoopConfiguration) +if (!fs.mkdirs(checkpointPath)) { + throw new SparkException(Failed to create checkpoint path + checkpointPath) +} + } + + val broadcastedConf = rdd.context.broadcast( --- End diff -- please make all of these private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970855 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -66,6 +88,27 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) cpFile } + // Get the iterator used to write checkpoint data to HDFS + def getCheckpointIterator( + rddIterator: Iterator[T], + context: TaskContext, + partitionId: Int): Iterator[T] = { +RDDCheckpointData.synchronized { + if (cpState == Initialized) { +// Create the output path for the checkpoint +val path = new Path(checkpointDir.get, rdd- + rddId) --- End diff -- I see, because the rdd field is transient. Then instead we should update `rddCheckpointDataPath` down there to not take in a `SparkContext`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971284 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -66,6 +88,27 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) cpFile } + // Get the iterator used to write checkpoint data to HDFS + def getCheckpointIterator( + rddIterator: Iterator[T], + context: TaskContext, + partitionId: Int): Iterator[T] = { +RDDCheckpointData.synchronized { + if (cpState == Initialized) { +// Create the output path for the checkpoint +val path = new Path(checkpointDir.get, rdd- + rddId) --- End diff -- actually, the path here should just be `checkpointPath`. Right now this duplicates some code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971873 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag]( +values: Iterator[A], +path: String, +broadcastedConf: Broadcast[SerializableConfiguration], +partitionId: Int, +context: TaskContext, +blockSize: Int = -1) extends Iterator[A] with Logging { + + private val env = SparkEnv.get + private var fs: FileSystem = null + private val bufferSize = env.conf.getInt(spark.buffer.size, 65536) + private var serializeStream: SerializationStream = null + + private var finalOutputPath: Path = null + private var tempOutputPath: Path = null + + /** + * Initialize this iterator by creating temporary output path and serializer instance. + * + */ + def init(): this.type = { --- End diff -- can we remove this `init` method? It doesn't seem necessary since we can just do all of this in the constructor. The advantage of removing it is that we won't have a bunch of `var`s initialized to `null`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970514 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -66,6 +88,27 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) cpFile } + // Get the iterator used to write checkpoint data to HDFS --- End diff -- can you make this a real java doc: ``` /** * Wrap the given iterator in a checkpointing iterator, which checkpoints values * as the original iterator is consumed. This allows us to checkpoint the RDD * without computing it more than once (SPARK-8582). */ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970095 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -238,11 +238,16 @@ abstract class RDD[T: ClassTag]( * subclasses of RDD. */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { -if (storageLevel != StorageLevel.NONE) { +val iter = if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } +if (checkpointData.isDefined) { + checkpointData.get.getCheckpointIterator(iter, context, split.index) +} else { + iter +} --- End diff -- this could be ``` checkpointData .map(_.getCheckpointIterator(iter, context, split.index)) .getOrElse(iter) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970274 --- Diff: core/src/test/scala/org/apache/spark/util/CheckpointingIteratorSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.File + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkFunSuite + +class CheckpointingIteratorSuite extends SparkFunSuite with LocalSparkContext with Logging { + var checkpointDir: File = _ + val partitioner = new HashPartitioner(2) + + override def beforeEach() { +super.beforeEach() +checkpointDir = File.createTempFile(temp, , Utils.createTempDir()) +checkpointDir.delete() +sc = new SparkContext(local, test) +sc.setCheckpointDir(checkpointDir.toString) + } + + override def afterEach() { --- End diff -- nit: need `Unit` return type here and other places --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33970320 --- Diff: core/src/test/scala/org/apache/spark/util/CheckpointingIteratorSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.File + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkFunSuite + +class CheckpointingIteratorSuite extends SparkFunSuite with LocalSparkContext with Logging { + var checkpointDir: File = _ + val partitioner = new HashPartitioner(2) + + override def beforeEach() { +super.beforeEach() +checkpointDir = File.createTempFile(temp, , Utils.createTempDir()) +checkpointDir.delete() +sc = new SparkContext(local, test) +sc.setCheckpointDir(checkpointDir.toString) + } + + override def afterEach() { +super.afterEach() +Utils.deleteRecursively(checkpointDir) + } + --- End diff -- could you add a test that verifies the input data is only read once? i.e. a test that verifies SPARK-8582 is actually fixed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971537 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -44,6 +46,26 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) import CheckpointState._ + // Because SparkContext is transient in RDD, so we can't get the id and checkpointDir later. + // So keep a copy of the id and checkpointDir. --- End diff -- Not sure if I understand this comment. How did it work before then? Even before this patch `doCheckpoint` directly calls `rdd.id` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971573 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -82,25 +125,13 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) } } -// Create the output path for the checkpoint -val path = RDDCheckpointData.rddCheckpointDataPath(rdd.context, rdd.id).get -val fs = path.getFileSystem(rdd.context.hadoopConfiguration) -if (!fs.mkdirs(path)) { - throw new SparkException(sFailed to create checkpoint path $path) -} - -// Save to file, and reload it as an RDD -val broadcastedConf = rdd.context.broadcast( - new SerializableConfiguration(rdd.context.hadoopConfiguration)) +val path = checkpointPath --- End diff -- no need to declare another variable here? Just use `checkpointPath` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971391 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -44,6 +46,26 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) import CheckpointState._ + // Because SparkContext is transient in RDD, so we can't get the id and checkpointDir later. + // So keep a copy of the id and checkpointDir. + // The id of RDD + val rddId: Int = rdd.id + + // The path the checkpoint data will write to. + val checkpointDir = rdd.context.checkpointDir + @transient var checkpointPath: Path = null + @transient var fs: FileSystem = null + if (checkpointDir.isDefined) { +checkpointPath = new Path(checkpointDir.get, rdd- + rddId) +fs = checkpointPath.getFileSystem(rdd.context.hadoopConfiguration) +if (!fs.mkdirs(checkpointPath)) { + throw new SparkException(Failed to create checkpoint path + checkpointPath) +} + } --- End diff -- these don't need to be vars right? In fact, `fs`, `rddId` and `checkpointDir` don't even need to exist. You can just do ``` @transient val checkpointPath: Path = { val path = RDDCheckpointData.rddCheckpointDataPath(rdd.context, rdd.id) val fs = path.getFileSystem(rdd.context.hadoopConfiguration) if (!fs.mkdirs(path)) { ... } path } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971692 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag]( +values: Iterator[A], +path: String, +broadcastedConf: Broadcast[SerializableConfiguration], +partitionId: Int, +context: TaskContext, +blockSize: Int = -1) extends Iterator[A] with Logging { --- End diff -- (blockSize doesn't seem to be used anywhere?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971634 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag]( +values: Iterator[A], +path: String, +broadcastedConf: Broadcast[SerializableConfiguration], +partitionId: Int, +context: TaskContext, +blockSize: Int = -1) extends Iterator[A] with Logging { --- End diff -- could you document what each of these variables represent? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33971920 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag]( +values: Iterator[A], +path: String, +broadcastedConf: Broadcast[SerializableConfiguration], +partitionId: Int, +context: TaskContext, +blockSize: Int = -1) extends Iterator[A] with Logging { + + private val env = SparkEnv.get + private var fs: FileSystem = null + private val bufferSize = env.conf.getInt(spark.buffer.size, 65536) + private var serializeStream: SerializationStream = null + + private var finalOutputPath: Path = null + private var tempOutputPath: Path = null + + /** + * Initialize this iterator by creating temporary output path and serializer instance. + * + */ + def init(): this.type = { +val outputDir = new Path(path) +fs = outputDir.getFileSystem(broadcastedConf.value.value) + +val finalOutputName = CheckpointRDD.splitIdToFile(partitionId) +finalOutputPath = new Path(outputDir, finalOutputName) +tempOutputPath = + new Path(outputDir, . + finalOutputName + -attempt- + context.attemptNumber) + +if (fs.exists(tempOutputPath)) { + // There are more than one iterator of the RDD is consumed. + // Don't checkpoint data in this iterator. + doCheckpoint = false + return this +} + +val fileOutputStream = if (blockSize 0) { + fs.create(tempOutputPath, false, bufferSize) +} else { + // This is mainly for testing purpose + fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize) +} +val serializer = env.serializer.newInstance() +serializeStream = serializer.serializeStream(fileOutputStream) +this + } + + /** + * Called when this iterator is on the latest element by `hasNext`. + * This method will rename temporary output path to final output path of checkpoint data. + */ + def completion(): Unit = { +if (!doCheckpoint) { + return +} + +serializeStream.close() + +if (!fs.rename(tempOutputPath, finalOutputPath)) { + if (!fs.exists(finalOutputPath)) { +logInfo(Deleting tempOutputPath + tempOutputPath) +fs.delete(tempOutputPath, false) +throw new IOException(Checkpoint failed: failed to save output of task: + + context.attemptNumber + and final output path does not exist) + } else { +// Some other copy of this task must've finished before us and renamed it +logInfo(Final output path + finalOutputPath + already exists; not overwriting it) +fs.delete(tempOutputPath, false) + } +} + } + + def checkpointing(item: A): Unit = { +serializeStream.writeObject(item) + } + + override def next(): A = { +val item = values.next() +if (doCheckpoint) { + checkpointing(item) +} +// If this the latest item, call hasNext will write to final output early. +hasNext +item + } + + private[this] var doCheckpoint = true + private[this] var completed = false
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-119035563 @viirya by the way, I'm currently working on a major refactoring of all of this code in parallel. There will likely be a lot of conflicts to resolve at this rate. If you prefer, I could take up this issue and use your patch as a basis. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-119040054 @andrewor14 no problem, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118386386 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118387355 [Test build #36509 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36509/consoleFull) for PR 7021 at commit [`2f43ff3`](https://github.com/apache/spark/commit/2f43ff3c6d1a4a428e5cbe8f4a4e4347274fc95c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118387170 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118387151 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118386117 [Test build #36508 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36508/consoleFull) for PR 7021 at commit [`a829a7d`](https://github.com/apache/spark/commit/a829a7d563d1a12e7d99cc1aecf87e264a016f7b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118406411 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118406123 [Test build #36509 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36509/console) for PR 7021 at commit [`2f43ff3`](https://github.com/apache/spark/commit/2f43ff3c6d1a4a428e5cbe8f4a4e4347274fc95c). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118386384 [Test build #36508 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36508/console) for PR 7021 at commit [`a829a7d`](https://github.com/apache/spark/commit/a829a7d563d1a12e7d99cc1aecf87e264a016f7b). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118385985 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118386010 @andrewor14 Thanks. I have added few tests for the new iterator. Other comments are addressed too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-118385960 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33739129 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala --- @@ -71,6 +92,27 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) RDDCheckpointData.synchronized { cpFile } } + // Get the iterator used to write checkpoint data to HDFS + def getCheckpointIterator( +rddIterator: Iterator[T], +context: TaskContext, +partitionId: Int): Iterator[T] = { --- End diff -- indent by 2 spaces, see the style guide: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33739091 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( + sub: I, + path: String, + broadcastedConf: Broadcast[SerializableConfiguration], + partitionId: Int, + context: TaskContext, + blockSize: Int = -1) extends Iterator[A] with Logging { --- End diff -- these should all be indented two spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33739188 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( --- End diff -- this type signature is rather complex. Could you rewrite this with only `A`? Then `sub` can just be of type `Iterator[A]`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33739215 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( + sub: I, + path: String, + broadcastedConf: Broadcast[SerializableConfiguration], + partitionId: Int, + context: TaskContext, + blockSize: Int = -1) extends Iterator[A] with Logging { + + val env = SparkEnv.get + var fs: FileSystem = null + val bufferSize = env.conf.getInt(spark.buffer.size, 65536) + var serializeStream: SerializationStream = null + + var finalOutputPath: Path = null + var tempOutputPath: Path = null --- End diff -- many of these variables can be private. It would be great to tighten the visibility as much as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33739199 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( --- End diff -- also I would rename `sub` to `values` or something --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-117861897 @viirya could you add some tests for this new iterator? In particular, we should have a test that fails before but no longer fails afterwards. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33739248 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( + sub: I, + path: String, + broadcastedConf: Broadcast[SerializableConfiguration], + partitionId: Int, + context: TaskContext, + blockSize: Int = -1) extends Iterator[A] with Logging { + + val env = SparkEnv.get + var fs: FileSystem = null + val bufferSize = env.conf.getInt(spark.buffer.size, 65536) + var serializeStream: SerializationStream = null + + var finalOutputPath: Path = null + var tempOutputPath: Path = null + + def init(): this.type = { +val outputDir = new Path(path) +fs = outputDir.getFileSystem(broadcastedConf.value.value) + +val finalOutputName = CheckpointRDD.splitIdToFile(partitionId) +finalOutputPath = new Path(outputDir, finalOutputName) +tempOutputPath = + new Path(outputDir, . + finalOutputName + -attempt- + context.attemptNumber) + +if (fs.exists(tempOutputPath)) { + // There are more than one iterator of the RDD is consumed. + // Don't checkpoint data in this iterator. + doCheckpoint = false + return this +} + +val fileOutputStream = if (blockSize 0) { + fs.create(tempOutputPath, false, bufferSize) +} else { + // This is mainly for testing purpose + fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize) +} +val serializer = env.serializer.newInstance() +serializeStream = serializer.serializeStream(fileOutputStream) +this + } + + def completion(): Unit = { --- End diff -- what does `completion` mean? Can you add some java docs here and everywhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115654097 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115654066 [Test build #35854 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35854/console) for PR 7021 at commit [`3c5b203`](https://github.com/apache/spark/commit/3c5b203fd2b85f4110795a1fc6ca3e289ca0d837). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115716483 [Test build #35859 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35859/console) for PR 7021 at commit [`3c5b203`](https://github.com/apache/spark/commit/3c5b203fd2b85f4110795a1fc6ca3e289ca0d837). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115719357 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115716694 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115719989 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115720317 [Test build #35862 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35862/consoleFull) for PR 7021 at commit [`3c5b203`](https://github.com/apache/spark/commit/3c5b203fd2b85f4110795a1fc6ca3e289ca0d837). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115719914 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115719321 unrelated failure again. Looks like jenkin is unstable now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115793340 [Test build #35862 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35862/console) for PR 7021 at commit [`3c5b203`](https://github.com/apache/spark/commit/3c5b203fd2b85f4110795a1fc6ca3e289ca0d837). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115793659 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115617127 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115617185 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115618429 [Test build #35854 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35854/consoleFull) for PR 7021 at commit [`3c5b203`](https://github.com/apache/spark/commit/3c5b203fd2b85f4110795a1fc6ca3e289ca0d837). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33342683 --- Diff: core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala --- @@ -63,6 +63,9 @@ class PartitionerAwareUnionRDD[T: ClassTag]( require(rdds.forall(_.partitioner.isDefined)) require(rdds.flatMap(_.partitioner).toSet.size == 1, Parent RDDs have different partitioners: + rdds.flatMap(_.partitioner)) + require(rdds.map(_.partitioner.get.numPartitions).toSet.size == 1, --- End diff -- The partitioners of the rdds might have different `numPartitions`. It will causes error later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33342843 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( + sub: I, + path: String, + broadcastedConf: Broadcast[SerializableConfiguration], + partitionId: Int, + context: TaskContext, + blockSize: Int = -1) extends Iterator[A] with Logging { + + val env = SparkEnv.get + var fs: FileSystem = null + val bufferSize = env.conf.getInt(spark.buffer.size, 65536) + var serializeStream: SerializationStream = null + + var finalOutputPath: Path = null + var tempOutputPath: Path = null + + def init(): this.type = { +val outputDir = new Path(path) +fs = outputDir.getFileSystem(broadcastedConf.value.value) + +val finalOutputName = CheckpointRDD.splitIdToFile(partitionId) +finalOutputPath = new Path(outputDir, finalOutputName) +tempOutputPath = + new Path(outputDir, . + finalOutputName + -attempt- + context.attemptNumber) + +if (fs.exists(tempOutputPath)) { --- End diff -- It is possible that more than one iterators for the same split are created and used, e.g., `CartesianRDD`. We only need one of them to write checkpoint data to disk. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33343168 --- Diff: core/src/main/scala/org/apache/spark/util/CheckpointingIterator.scala --- @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.io.IOException + +import scala.reflect.ClassTag + +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.CheckpointRDD +import org.apache.spark.serializer.SerializationStream + +/** + * Wrapper around an iterator which writes checkpoint data to HDFS while running action on + * a RDD to support checkpointing RDD. + */ +private[spark] class CheckpointingIterator[A: ClassTag, +I : Iterator[A]]( + sub: I, + path: String, + broadcastedConf: Broadcast[SerializableConfiguration], + partitionId: Int, + context: TaskContext, + blockSize: Int = -1) extends Iterator[A] with Logging { + + val env = SparkEnv.get + var fs: FileSystem = null + val bufferSize = env.conf.getInt(spark.buffer.size, 65536) + var serializeStream: SerializationStream = null + + var finalOutputPath: Path = null + var tempOutputPath: Path = null + + def init(): this.type = { +val outputDir = new Path(path) +fs = outputDir.getFileSystem(broadcastedConf.value.value) + +val finalOutputName = CheckpointRDD.splitIdToFile(partitionId) +finalOutputPath = new Path(outputDir, finalOutputName) +tempOutputPath = + new Path(outputDir, . + finalOutputName + -attempt- + context.attemptNumber) + +if (fs.exists(tempOutputPath)) { + // There are more than one iterator of the RDD is consumed. + // Don't checkpoint data in this iterator. + doCheckpoint = false + return this +} + +val fileOutputStream = if (blockSize 0) { + fs.create(tempOutputPath, false, bufferSize) +} else { + // This is mainly for testing purpose + fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize) +} +val serializer = env.serializer.newInstance() +serializeStream = serializer.serializeStream(fileOutputStream) +this + } + + def completion(): Unit = { +if (!doCheckpoint) { + return +} + +serializeStream.close() + +if (!fs.rename(tempOutputPath, finalOutputPath)) { + if (!fs.exists(finalOutputPath)) { +logInfo(Deleting tempOutputPath + tempOutputPath) +fs.delete(tempOutputPath, false) +throw new IOException(Checkpoint failed: failed to save output of task: + + context.attemptNumber + and final output path does not exist) + } else { +// Some other copy of this task must've finished before us and renamed it +logInfo(Final output path + finalOutputPath + already exists; not overwriting it) +fs.delete(tempOutputPath, false) + } +} + } + + def checkpointing(item: A): Unit = { +serializeStream.writeObject(item) + } + + override def next(): A = { +val item = sub.next() +if (doCheckpoint) { + checkpointing(item) +} +// If this the latest item, call hasNext will write to final output early. +hasNext --- End diff -- Sometimes the rdd.iterator will not be consumed to call `hasNext` until it returns false, e.g., we already know the number of elements and call next() at exact times of that number. If so, we need to write the checkpoint data to final output path early when we return the latest element. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/7021#discussion_r33343438 --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala --- @@ -359,7 +359,7 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging * have large size. */ def generateFatPairRDD(): RDD[(Int, Int)] = { -new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x = x) +new FatPairRDD(sc.makeRDD(1 to 100, 2), partitioner).mapValues(x = x) --- End diff -- The `partitioner` in `CheckpointSuite` is a `HashPartitioner` with 2 partitions. So to make `PartitionerAwareUnionRDD` work, `FatPairRDD` should be 2 partitions too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115661632 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115675340 [Test build #35859 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35859/consoleFull) for PR 7021 at commit [`3c5b203`](https://github.com/apache/spark/commit/3c5b203fd2b85f4110795a1fc6ca3e289ca0d837). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115675264 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115660152 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115327583 [Test build #35796 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35796/consoleFull) for PR 7021 at commit [`1a3055e`](https://github.com/apache/spark/commit/1a3055ea6ca67fcb23c1188b7c3344c726b054b3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115323998 [Test build #35794 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35794/consoleFull) for PR 7021 at commit [`d863516`](https://github.com/apache/spark/commit/d8635168a9f01e3be2b53a27cc5918a1a0ed1612). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/7021 [SPARK-8582][Core] Add CheckpointingIterator to optimize checkpointing JIRA: https://issues.apache.org/jira/browse/SPARK-8582 You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 optimize_checkpoint Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7021.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7021 commit d8635168a9f01e3be2b53a27cc5918a1a0ed1612 Author: Liang-Chi Hsieh vii...@gmail.com Date: 2015-06-25T16:50:45Z Add CheckpointingIterator to optimize checkpointing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115323136 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115323156 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115324407 [Test build #35794 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35794/console) for PR 7021 at commit [`d863516`](https://github.com/apache/spark/commit/d8635168a9f01e3be2b53a27cc5918a1a0ed1612). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115326341 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115326371 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user sujkh85 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115322604 NAVER - http://www.naver.com/ su...@naver.com ëê» ë³´ë´ì ë©ì¼ [spark] [SPARK-8582][Core] Add CheckpointingIterator to optimize checkpointing (#7021) ì´ ë¤ìê³¼ ê°ì ì´ì ë¡ ì ì¡ ì¤í¨íìµëë¤. ë°ë ì¬ëì´ íìëì ë©ì¼ì ìì ì°¨ë¨ íììµëë¤. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user sujkh85 commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115323273 NAVER - http://www.naver.com/ su...@naver.com ëê» ë³´ë´ì ë©ì¼ Re: [spark] [SPARK-8582][Core] Add CheckpointingIterator to optimize checkpointing (#7021) ì´ ë¤ìê³¼ ê°ì ì´ì ë¡ ì ì¡ ì¤í¨íìµëë¤. ë°ë ì¬ëì´ íìëì ë©ì¼ì ìì ì°¨ë¨ íììµëë¤. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115324412 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115361838 [Test build #35796 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35796/console) for PR 7021 at commit [`1a3055e`](https://github.com/apache/spark/commit/1a3055ea6ca67fcb23c1188b7c3344c726b054b3). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8582][Core] Add CheckpointingIterator t...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7021#issuecomment-115361885 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org