[jira] [Commented] (SPARK-19645) structured streaming job restart bug

2017-02-17 Thread Hongyao Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15871529#comment-15871529
 ] 

Hongyao Zhao commented on SPARK-19645:
--

It's seems that this is related to filesystem type.
All the test cases which first stop the stream and then start the stream do not 
throw this rename exception. 
But if change from local filesystem to hdfs, this exception will be thrown.
Maybe we should add an overwrite option to rename function when the filesystem 
is hdfs?

> structured streaming job restart bug
> 
>
> Key: SPARK-19645
> URL: https://issues.apache.org/jira/browse/SPARK-19645
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: guifeng
>Priority: Critical
>
> We are trying to use Structured Streaming in product, however currently 
> there exists a bug refer to the process of streaming job restart. 
>   The following is  the concrete error message:  
> {quote}
>Caused by: java.lang.IllegalStateException: Error committing version 2 
> into HDFSStateStore[id = (op=0, part=136), dir = 
> /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136]
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:162)
>   at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:173)
>   at 
> org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3.apply(StatefulAggregate.scala:123)
>   at 
> org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Failed to rename 
> /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/temp--5345709896617324284 
> to /tmp/Pipeline_112346-continueagg-bxaxs/state/0/136/2.delta
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:259)
>   at 
> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:156)
>   ... 14 more
> {quote}
>  The bug can be easily reproduce when restart previous streaming job, and 
> the main reason is that when restart streaming  job spark will recompute WAL 
> offsets and generate the same hdfs delta file(latest delta file generated 
> before restart and named "currentBatchId.delta") . In my opinion, this is a 
> bug. If you guy consider that  this is a bug also,  I can fix it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-28 Thread Hongyao Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15398615#comment-15398615
 ] 

Hongyao Zhao commented on SPARK-16746:
--

I did some test yesterday,  It seems that spark 1.6 direct api can consume 
messages from Kafka 0.9 brokers, so I can get around this problem by using 
direct api. It a good news to me, but I think what I mentioned in issue has 
nothing to do with whatkind of receivers I use,  because ReceiverTracker is 
a internal class in spark source code. 

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-27 Thread Hongyao Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15395354#comment-15395354
 ] 

Hongyao Zhao edited comment on SPARK-16746 at 7/27/16 10:06 AM:


I think if spark.streaming.receiver.writeAheadLog.enable is not set to true,  
the driver should store received block info in memory, regardless  of whether 
or not successfully write to HDFS.  

It seems that, DriverTracker will write block info to hdfs if checkpoint dir is 
set, regardless of whether or not spark.streaming.receiver.writeAheadLog.enable 
is set.


was (Author: andyzhao):
I think if spark.streaming.receiver.writeAheadLog.enable is not set to true,  
the driver should store received block info in memory, regardless  of whether 
or not successfully write to HDFS.  

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-27 Thread Hongyao Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15395354#comment-15395354
 ] 

Hongyao Zhao commented on SPARK-16746:
--

I think if spark.streaming.receiver.writeAheadLog.enable is not set to true,  
the driver should store received block info in memory, regardless  of whether 
or not successfully write to HDFS.  

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Hongyao Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongyao Zhao updated SPARK-16746:
-
Description: 
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{code:borderStyle=solid}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{code}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 

  was:
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{code:title=code|borderStyle=solid}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{code}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 


> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   

[jira] [Updated] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Hongyao Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongyao Zhao updated SPARK-16746:
-
Description: 
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{code:title=code|borderStyle=solid}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{code}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 

  was:
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{code:title=Bar.scala|borderStyle=solid}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{code}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 


> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:title=code|borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean 

[jira] [Updated] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Hongyao Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongyao Zhao updated SPARK-16746:
-
Description: 
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{code:title=Bar.scala|borderStyle=solid}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{code}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 

  was:
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{quote}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{quote}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 


> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:title=Bar.scala|borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>

[jira] [Updated] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Hongyao Zhao (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongyao Zhao updated SPARK-16746:
-
Description: 
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
{quote}
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
{quote}

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 

  was:
I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
```
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
```

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 


> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {quote}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 

[jira] [Created] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-26 Thread Hongyao Zhao (JIRA)
Hongyao Zhao created SPARK-16746:


 Summary: Spark streaming lost data when ReceiverTracker writes 
Blockinfo to hdfs timeout
 Key: SPARK-16746
 URL: https://issues.apache.org/jira/browse/SPARK-16746
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.6.1
Reporter: Hongyao Zhao
Priority: Minor


I wrote a spark streaming program which consume 1000 messages from one topic of 
Kafka, did some transformation, and wrote the result back to another topic. But 
only found 988 messages in the second topic. I checked log info and confirmed 
all messages was received by receivers. But I found a hdfs writing time out 
message printed from Class BatchedWriteAheadLog. 

I checkout source code and found code like this: 
  
```
/** Add received block. This event will get written to the write ahead log 
(if enabled). */ 
  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
try { 
  val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
  if (writeResult) { 
synchronized { 
  getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo 
} 
logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
  } else { 
logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
receiving " + 
  s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
Ahead Log.") 
  } 
  writeResult 
} catch { 
  case NonFatal(e) => 
logError(s"Error adding block $receivedBlockInfo", e) 
false 
} 
  } 
```

It seems that ReceiverTracker tries to write block info to hdfs, but the 
write operation time out, this cause writeToLog function return false, and  
this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
receivedBlockInfo" is skipped. so the block info is lost. 

   The spark version I use is 1.6.1 and I did not turn on 
spark.streaming.receiver.writeAheadLog.enable. 

   I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org