[jira] [Commented] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-28 Thread Hongyao Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15398615#comment-15398615
 ] 

Hongyao Zhao commented on SPARK-16746:
--

I did some test yesterday,  It seems that spark 1.6 direct api can consume 
messages from Kafka 0.9 brokers, so I can get around this problem by using 
direct api. It a good news to me, but I think what I mentioned in issue has 
nothing to do with whatkind of receivers I use,  because ReceiverTracker is 
a internal class in spark source code. 

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-28 Thread Cody Koeninger (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15397520#comment-15397520
 ] 

Cody Koeninger commented on SPARK-16746:


>From conversation on mailing list, it wasn't clear whether this was using the 
>official Kafka integration or dibbhatt from spark packages. If the latter, 
>should probably bring it up to him first.

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-27 Thread Hongyao Zhao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15395354#comment-15395354
 ] 

Hongyao Zhao commented on SPARK-16746:
--

I think if spark.streaming.receiver.writeAheadLog.enable is not set to true,  
the driver should store received block info in memory, regardless  of whether 
or not successfully write to HDFS.  

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16746) Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs timeout

2016-07-27 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15395196#comment-15395196
 ] 

Sean Owen commented on SPARK-16746:
---

If you're saying that the error is in timing out or being unable to 
successfully write to HDFS, that seems like an env issue or network 
architecture problem. What's the expected outcome here if you can't write to 
the WAL? the data has nowhere to be logged.

> Spark streaming lost data when ReceiverTracker writes Blockinfo to hdfs 
> timeout
> ---
>
> Key: SPARK-16746
> URL: https://issues.apache.org/jira/browse/SPARK-16746
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.6.1
>Reporter: Hongyao Zhao
>Priority: Minor
>
> I wrote a spark streaming program which consume 1000 messages from one topic 
> of Kafka, did some transformation, and wrote the result back to another 
> topic. But only found 988 messages in the second topic. I checked log info 
> and confirmed all messages was received by receivers. But I found a hdfs 
> writing time out message printed from Class BatchedWriteAheadLog. 
> 
> I checkout source code and found code like this: 
>   
> {code:borderStyle=solid}
> /** Add received block. This event will get written to the write ahead 
> log (if enabled). */ 
>   def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 
> try { 
>   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 
>   if (writeResult) { 
> synchronized { 
>   getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo 
> } 
> logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId}") 
>   } else { 
> logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} 
> receiving " + 
>   s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write 
> Ahead Log.") 
>   } 
>   writeResult 
> } catch { 
>   case NonFatal(e) => 
> logError(s"Error adding block $receivedBlockInfo", e) 
> false 
> } 
>   } 
> {code}
> 
> It seems that ReceiverTracker tries to write block info to hdfs, but the 
> write operation time out, this cause writeToLog function return false, and  
> this code "getReceivedBlockQueue(receivedBlockInfo.streamId) += 
> receivedBlockInfo" is skipped. so the block info is lost. 
>The spark version I use is 1.6.1 and I did not turn on 
> spark.streaming.receiver.writeAheadLog.enable. 
> 
>I want to know whether or not this is a designed behaviour. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org