Hi Dibyendu, I am not sure I understand completely. But are you suggesting that currently there is no way to enable Checkpoint directory to be in Tachyon?
Thanks Nikunj On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya < dibyendu.bhattach...@gmail.com> wrote: > Hi, > > Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark > Streaming and make sure Spark Streaming can recover from Driver failure and > recover the blocks form Tachyon. > > The The Motivation for this PR is : > > If Streaming application stores the blocks OFF_HEAP, it may not need any > WAL like feature to recover from Driver failure. As long as the writing of > blocks to Tachyon from Streaming receiver is durable, it should be > recoverable from Tachyon directly on Driver failure. > This can solve the issue of expensive WAL write and duplicating the blocks > both in MEMORY and also WAL and also guarantee end to end No-Data-Loss > channel using OFF_HEAP store. > > https://github.com/apache/spark/pull/8817 > > This PR still under review . But having done various fail over testing in > my environment , I see this PR worked perfectly fine without any data loss > . Let see what TD and other have to say on this PR . > > Below is the configuration I used to test this PR .. > > > Spark : 1.6 from Master > Tachyon : 0.7.1 > > SparkConfiguration Details : > > SparkConf conf = new SparkConf().setAppName("TestTachyon") > .set("spark.streaming.unpersist", "true") > .set("spark.local.dir", "/mnt1/spark/tincan") > .set("tachyon.zookeeper.address","10.252.5.113:2182") > .set("tachyon.usezookeeper","true") > .set("spark.externalBlockStore.url", "tachyon-ft:// > ip-10-252-5-113.asskickery.us:19998") > .set("spark.externalBlockStore.baseDir", "/sparkstreaming") > .set("spark.externalBlockStore.folderName","pearson") > .set("spark.externalBlockStore.dirId", "subpub") > > .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true"); > > JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration( > 10000)); > > String checkpointDirectory = "hdfs:// > 10.252.5.113:9000/user/hadoop/spark/wal"; > > jsc.checkpoint(checkpointDirectory); > > > //I am using the My Receiver Based Consumer ( > https://github.com/dibbhatt/kafka-spark-consumer) . But > KafkaUtil.CreateStream will also work > > JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch( > jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP()); > > > > > Regards, > Dibyendu > > On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nos...@gmail.com> wrote: > >> Hi Dibyendu, >> >> How does one go about configuring spark streaming to use tachyon as its >> place for storing checkpoints? Also, can one do this with tachyon running >> on a completely different node than where spark processes are running? >> >> Thanks >> Nikunj >> >> >> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya < >> dibyendu.bhattach...@gmail.com> wrote: >> >>> Hi Tathagata, >>> >>> Thanks for looking into this. Further investigating I found that the >>> issue is with Tachyon does not support File Append. The streaming receiver >>> which writes to WAL when failed, and again restarted, not able to append to >>> same WAL file after restart. >>> >>> I raised this with Tachyon user group, and Haoyuan told that within 3 >>> months time Tachyon file append will be ready. Will revisit this issue >>> again then . >>> >>> Regards, >>> Dibyendu >>> >>> >>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Looks like somehow the file size reported by the FSInputDStream of >>>> Tachyon's FileSystem interface, is returning zero. >>>> >>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya < >>>> dibyendu.bhattach...@gmail.com> wrote: >>>> >>>>> Just to follow up this thread further . >>>>> >>>>> I was doing some fault tolerant testing of Spark Streaming with >>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able >>>>> to solve the BlockNotFound exception when I used Hierarchical Storage >>>>> of Tachyon , which is good. >>>>> >>>>> I continue doing some testing around storing the Spark Streaming WAL >>>>> and CheckPoint files also in Tachyon . Here is few finding .. >>>>> >>>>> >>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the >>>>> throughput is much higher . I tested the Driver and Receiver failure cases >>>>> , and Spark Streaming is able to recover without any Data Loss on Driver >>>>> failure. >>>>> >>>>> *But on Receiver failure , Spark Streaming looses data* as I see >>>>> Exception while reading the WAL file from Tachyon "receivedData" location >>>>> for the same Receiver id which just failed. >>>>> >>>>> If I change the Checkpoint location back to HDFS , Spark Streaming can >>>>> recover from both Driver and Receiver failure . >>>>> >>>>> Here is the Log details when Spark Streaming receiver failed ...I >>>>> raised a JIRA for the same issue : >>>>> https://issues.apache.org/jira/browse/SPARK-7525 >>>>> >>>>> >>>>> >>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2 >>>>> (epoch 1)* >>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to >>>>> remove executor 2 from BlockManagerMaster. >>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing >>>>> block manager BlockManagerId(2, 10.252.5.54, 45789) >>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 >>>>> successfully in removeExecutor >>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered >>>>> receiver for stream 2 from 10.252.5.62*:47255 >>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in >>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: >>>>> *Could >>>>> not read data from write ahead log record >>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 >>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)* >>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) >>>>> at >>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>>>> at >>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>>>> at scala.Option.getOrElse(Option.scala:120) >>>>> at >>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:744) >>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is past >>>>> EOF: 645603894, fileSize = 0* >>>>> at >>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) >>>>> at >>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) >>>>> at >>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) >>>>> at >>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) >>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) >>>>> ... 15 more >>>>> >>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 >>>>> in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes) >>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in >>>>> stage 103.0 (TID 422) on executor 10.252.5.61: >>>>> org.apache.spark.SparkException (Could not read data from write ahead log >>>>> record FileBasedWriteAheadLogSegment(tachyon-ft:// >>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) >>>>> [duplicate 1] >>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 >>>>> in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes) >>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor >>>>> updated: app-20150511104442-0048/2 is now LOST (worker lost) >>>>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend >>>>> - Executor app-20150511104442-0048/2 removed: worker lost >>>>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend >>>>> - Asked to remove non-existent executor 2 >>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in >>>>> stage 103.0 (TID 423) on executor 10.252.5.62: >>>>> org.apache.spark.SparkException (Could not read data from write ahead log >>>>> record FileBasedWriteAheadLogSegment(tachyon-ft:// >>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) >>>>> [duplicate 2] >>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage >>>>> 103.0 failed 4 times; aborting job >>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet >>>>> 103.0, whose tasks have all completed, from pool >>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage >>>>> 103 >>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103 >>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s >>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed: >>>>> foreachRDD at Consumer.java:92, took 0.953482 s >>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error >>>>> running job streaming job 1431341145000 ms.0 >>>>> org.apache.spark.SparkException: Job aborted due to stage failure: >>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 >>>>> in >>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could >>>>> not read data from write ahead log record >>>>> FileBasedWriteAheadLogSegment(tachyon-ft:// >>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919 >>>>> ) >>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144) >>>>> at >>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>>>> at >>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168) >>>>> at scala.Option.getOrElse(Option.scala:120) >>>>> at >>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:744) >>>>> Caused by: java.lang.IllegalArgumentException: Seek position is past >>>>> EOF: 645603894, fileSize = 0 >>>>> at >>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239) >>>>> at >>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37) >>>>> at >>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37) >>>>> at >>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104) >>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org >>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141) >>>>> ... 15 more >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for the updates! >>>>>> >>>>>> Best, >>>>>> >>>>>> Haoyuan >>>>>> >>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya < >>>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>>> >>>>>>> Just a followup on this Thread . >>>>>>> >>>>>>> I tried Hierarchical Storage on Tachyon ( >>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , >>>>>>> and that >>>>>>> seems to have worked and I did not see any any Spark Job failed due >>>>>>> to >>>>>>> BlockNotFoundException. >>>>>>> below is my Hierarchical Storage settings.. >>>>>>> >>>>>>> -Dtachyon.worker.hierarchystore.level.max=2 >>>>>>> -Dtachyon.worker.hierarchystore.level0.alias=MEM >>>>>>> >>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER >>>>>>> >>>>>>> >>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE >>>>>>> -Dtachyon.worker.hierarchystore.level1.alias=HDD >>>>>>> -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon >>>>>>> -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB >>>>>>> -Dtachyon.worker.allocate.strategy=MAX_FREE >>>>>>> -Dtachyon.worker.evict.strategy=LRU >>>>>>> >>>>>>> Regards, >>>>>>> Dibyendu >>>>>>> >>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya < >>>>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>>>> >>>>>>> > Dear All , >>>>>>> > >>>>>>> > I have been playing with Spark Streaming on Tachyon as the >>>>>>> OFF_HEAP block >>>>>>> > store . Primary reason for evaluating Tachyon is to find if >>>>>>> Tachyon can >>>>>>> > solve the Spark BlockNotFoundException . >>>>>>> > >>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted , >>>>>>> jobs >>>>>>> > failed due to block not found exception and storing blocks in >>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the >>>>>>> throughput a >>>>>>> > lot . >>>>>>> > >>>>>>> > >>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from >>>>>>> master , and >>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode . >>>>>>> Tachyon >>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3 >>>>>>> node AWS >>>>>>> > x-large cluster. >>>>>>> > >>>>>>> > I have used the low level Receiver based Kafka consumer ( >>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer) which I have >>>>>>> written >>>>>>> > to pull from Kafka and write Blocks to Tachyon >>>>>>> > >>>>>>> > >>>>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY >>>>>>> case ) >>>>>>> > but very good overall memory utilization (as it is off heap store) >>>>>>> . >>>>>>> > >>>>>>> > >>>>>>> > But I found one issue on which I need to clarification . >>>>>>> > >>>>>>> > >>>>>>> > In Tachyon case also , I find BlockNotFoundException , but due >>>>>>> to a >>>>>>> > different reason . What I see TachyonBlockManager.scala put the >>>>>>> blocks in >>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate >>>>>>> evicted >>>>>>> > from Tachyon Cache and when Spark try to find the block it throws >>>>>>> > BlockNotFoundException . >>>>>>> > >>>>>>> > I see a pull request which discuss the same .. >>>>>>> > >>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271 >>>>>>> > >>>>>>> > >>>>>>> > When I modified the WriteType to CACHE_THROUGH , >>>>>>> BlockDropException is >>>>>>> > gone , but it again impact the throughput .. >>>>>>> > >>>>>>> > >>>>>>> > Just curious to know , if Tachyon has any settings which can solve >>>>>>> the >>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting >>>>>>> > CACHE_THROUGH ? >>>>>>> > >>>>>>> > Regards, >>>>>>> > Dibyendu >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Haoyuan Li >>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/> >>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/ >>>>>> >>>>> >>>>> >>>> >>> >> >