Hi Dibyendu,

I am not sure I understand completely. But are you suggesting that
currently there is no way to enable Checkpoint directory to be in Tachyon?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 11:49 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi,
>
> Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
> Streaming and make sure Spark Streaming can recover from Driver failure and
> recover the blocks form Tachyon.
>
> The The Motivation for this PR is  :
>
> If Streaming application stores the blocks OFF_HEAP, it may not need any
> WAL like feature to recover from Driver failure. As long as the writing of
> blocks to Tachyon from Streaming receiver is durable, it should be
> recoverable from Tachyon directly on Driver failure.
> This can solve the issue of expensive WAL write and duplicating the blocks
> both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
> channel using OFF_HEAP store.
>
> https://github.com/apache/spark/pull/8817
>
> This PR still under review . But having done various fail over testing in
> my environment , I see this PR worked perfectly fine without any data loss
> . Let see what TD and other have to say on this PR .
>
> Below is the configuration I used to test this PR ..
>
>
> Spark : 1.6 from Master
> Tachyon : 0.7.1
>
> SparkConfiguration Details :
>
> SparkConf conf = new SparkConf().setAppName("TestTachyon")
> .set("spark.streaming.unpersist", "true")
> .set("spark.local.dir", "/mnt1/spark/tincan")
> .set("tachyon.zookeeper.address","10.252.5.113:2182")
> .set("tachyon.usezookeeper","true")
> .set("spark.externalBlockStore.url", "tachyon-ft://
> ip-10-252-5-113.asskickery.us:19998")
>         .set("spark.externalBlockStore.baseDir", "/sparkstreaming")
>         .set("spark.externalBlockStore.folderName","pearson")
>         .set("spark.externalBlockStore.dirId", "subpub")
>
> .set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");
>
> JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
> 10000));
>
> String checkpointDirectory = "hdfs://
> 10.252.5.113:9000/user/hadoop/spark/wal";
>
> jsc.checkpoint(checkpointDirectory);
>
>
> //I am using the My Receiver Based Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) . But
> KafkaUtil.CreateStream will also work
>
> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
> jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());
>
>
>
>
> Regards,
> Dibyendu
>
> On Sat, Sep 26, 2015 at 11:59 AM, N B <nb.nos...@gmail.com> wrote:
>
>> Hi Dibyendu,
>>
>> How does one go about configuring spark streaming to use tachyon as its
>> place for storing checkpoints? Also, can one do this with tachyon running
>> on a completely different node than where spark processes are running?
>>
>> Thanks
>> Nikunj
>>
>>
>> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi Tathagata,
>>>
>>> Thanks for looking into this. Further investigating I found that the
>>> issue is with Tachyon does not support File Append. The streaming receiver
>>> which writes to WAL when failed, and again restarted, not able to append to
>>> same WAL file after restart.
>>>
>>> I raised this with Tachyon user group, and Haoyuan told that within 3
>>> months time Tachyon file append will be ready. Will revisit this issue
>>> again then .
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Looks like somehow the file size reported by the FSInputDStream of
>>>> Tachyon's FileSystem interface, is returning zero.
>>>>
>>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>
>>>>> Just to follow up this thread further .
>>>>>
>>>>> I was doing some fault tolerant testing of Spark Streaming with
>>>>> Tachyon as OFF_HEAP block store. As I said in earlier email, I could able
>>>>> to solve the BlockNotFound exception when I used Hierarchical Storage
>>>>> of Tachyon ,  which is good.
>>>>>
>>>>> I continue doing some testing around storing the Spark Streaming WAL
>>>>> and CheckPoint files also in Tachyon . Here is few finding ..
>>>>>
>>>>>
>>>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>>>> failure.
>>>>>
>>>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>>>  for the same Receiver id which just failed.
>>>>>
>>>>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>>>>> recover from both Driver and Receiver failure .
>>>>>
>>>>> Here is the Log details when Spark Streaming receiver failed ...I
>>>>> raised a JIRA for the same issue :
>>>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>>>
>>>>>
>>>>>
>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>>>> (epoch 1)*
>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>>>>> remove executor 2 from BlockManagerMaster.
>>>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>>>>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>>>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>>>> successfully in removeExecutor
>>>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>>>> receiver for stream 2 from 10.252.5.62*:47255
>>>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: 
>>>>> *Could
>>>>> not read data from write ahead log record
>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>> <http://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919>)*
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>>>>> EOF: 645603894, fileSize = 0*
>>>>> at
>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>> at
>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>> ... 15 more
>>>>>
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2
>>>>> in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in
>>>>> stage 103.0 (TID 422) on executor 10.252.5.61:
>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>> [duplicate 1]
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3
>>>>> in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
>>>>> INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor
>>>>> updated: app-20150511104442-0048/2 is now LOST (worker lost)
>>>>> INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
>>>>> - Executor app-20150511104442-0048/2 removed: worker lost
>>>>> ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend
>>>>> - Asked to remove non-existent executor 2
>>>>> INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in
>>>>> stage 103.0 (TID 423) on executor 10.252.5.62:
>>>>> org.apache.spark.SparkException (Could not read data from write ahead log
>>>>> record FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919))
>>>>> [duplicate 2]
>>>>> ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage
>>>>> 103.0 failed 4 times; aborting job
>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet
>>>>> 103.0, whose tasks have all completed, from pool
>>>>> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage
>>>>> 103
>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103
>>>>> (foreachRDD at Consumer.java:92) failed in 0.943 s
>>>>> INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed:
>>>>> foreachRDD at Consumer.java:92, took 0.953482 s
>>>>> ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error
>>>>> running job streaming job 1431341145000 ms.0
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 
>>>>> in
>>>>> stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could
>>>>> not read data from write ahead log record
>>>>> FileBasedWriteAheadLogSegment(tachyon-ft://
>>>>> 10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>>>> )
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>> at
>>>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>> at java.lang.Thread.run(Thread.java:744)
>>>>> Caused by: java.lang.IllegalArgumentException: Seek position is past
>>>>> EOF: 645603894, fileSize = 0
>>>>> at
>>>>> tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>>>> at
>>>>> org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
>>>>> at
>>>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
>>>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>>>> ... 15 more
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, May 8, 2015 at 11:03 PM, Haoyuan Li <haoyuan...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the updates!
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Haoyuan
>>>>>>
>>>>>> On Fri, May 8, 2015 at 8:40 AM, Dibyendu Bhattacharya <
>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>
>>>>>>> Just a followup on this Thread .
>>>>>>>
>>>>>>> I tried Hierarchical Storage on Tachyon (
>>>>>>> http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) ,
>>>>>>> and that
>>>>>>> seems to have worked and I did not see any any Spark Job failed due
>>>>>>> to
>>>>>>> BlockNotFoundException.
>>>>>>> below is my  Hierarchical Storage settings..
>>>>>>>
>>>>>>>   -Dtachyon.worker.hierarchystore.level.max=2
>>>>>>>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>>>>>>>
>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>>>>>>>
>>>>>>>
>>>>>>> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>>>>>>>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>>>>>>>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>>>>>>>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>>>>>>>   -Dtachyon.worker.evict.strategy=LRU
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dibyendu
>>>>>>>
>>>>>>> On Thu, May 7, 2015 at 1:46 PM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>>
>>>>>>> > Dear All ,
>>>>>>> >
>>>>>>> > I have been playing with Spark Streaming on Tachyon as the
>>>>>>> OFF_HEAP block
>>>>>>> > store  . Primary reason for evaluating Tachyon is to find if
>>>>>>> Tachyon can
>>>>>>> > solve the Spark BlockNotFoundException .
>>>>>>> >
>>>>>>> > In traditional MEMORY_ONLY StorageLevel, when blocks are evicted ,
>>>>>>> jobs
>>>>>>> > failed due to block not found exception and storing blocks in
>>>>>>> > MEMORY_AND_DISK is not a good option either as it impact the
>>>>>>> throughput a
>>>>>>> > lot .
>>>>>>> >
>>>>>>> >
>>>>>>> > To test how Tachyon behave , I took the latest spark 1.4 from
>>>>>>> master , and
>>>>>>> > used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode .
>>>>>>> Tachyon
>>>>>>> > is running in 3 Node AWS x-large cluster and Spark is running in 3
>>>>>>> node AWS
>>>>>>> > x-large cluster.
>>>>>>> >
>>>>>>> > I have used the low level Receiver based Kafka consumer (
>>>>>>> > https://github.com/dibbhatt/kafka-spark-consumer)  which I have
>>>>>>> written
>>>>>>> > to pull from Kafka and write Blocks to Tachyon
>>>>>>> >
>>>>>>> >
>>>>>>> > I found there is similar improvement in throughput (as MEMORY_ONLY
>>>>>>> case )
>>>>>>> > but very good overall memory utilization (as it is off heap store)
>>>>>>> .
>>>>>>> >
>>>>>>> >
>>>>>>> > But I found one issue on which I need to clarification .
>>>>>>> >
>>>>>>> >
>>>>>>> > In Tachyon case also , I find  BlockNotFoundException  , but due
>>>>>>> to a
>>>>>>> > different reason .  What I see TachyonBlockManager.scala put the
>>>>>>> blocks in
>>>>>>> > WriteType.TRY_CACHE configuration . And because of this Blocks ate
>>>>>>> evicted
>>>>>>> > from Tachyon Cache and when Spark try to find the block it throws
>>>>>>> >  BlockNotFoundException .
>>>>>>> >
>>>>>>> > I see a pull request which discuss the same ..
>>>>>>> >
>>>>>>> > https://github.com/apache/spark/pull/158#discussion_r11195271
>>>>>>> >
>>>>>>> >
>>>>>>> > When I modified the WriteType to CACHE_THROUGH ,
>>>>>>> BlockDropException is
>>>>>>> > gone , but it again impact the throughput ..
>>>>>>> >
>>>>>>> >
>>>>>>> > Just curious to know , if Tachyon has any settings which can solve
>>>>>>> the
>>>>>>> > Block Eviction from Cache to Disk, other than explicitly setting
>>>>>>> > CACHE_THROUGH  ?
>>>>>>> >
>>>>>>> > Regards,
>>>>>>> > Dibyendu
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Haoyuan Li
>>>>>> CEO, Tachyon Nexus <http://www.tachyonnexus.com/>
>>>>>> AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to