Hello All,

We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
    val dfsPath = new Path(path)
    val dfs = getFileSystemForPath(dfsPath, conf)
    // If the file exists and we have append support, append instead of 
creating a new file
    val stream: FSDataOutputStream = {
      if (dfs.isFile(dfsPath)) {
        if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
          dfs.append(dfsPath)
        } else {
          throw new IllegalStateException("File exists and there is no append 
support!")
        }
      } else {
        dfs.create(dfsPath)
      }
    }
    stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
        at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597))))
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAheadLogBasedStoreResult(input-3-1478553647042,Some(4686),FileBasedWriteAheadLogSegment(hdfs:
//mycluster/EventCheckpoint-30-8-16-3/receivedData/3/log-1478553818624-1478553878624,0,197473))))
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
        at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
        at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
        at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
        at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 INFO BlockManagerInfo: Added input-3-1478553647043 in memory 
on 10.0.0.11:42316 (size: 283.1 KB, free: 2.6 GB)

I am sure Spark Streaming is not expected to lose data when WAL is enabled. So 
what are we doing wrong here?

Thanks, Arijit

Reply via email to