Also, what I understand is, max failures doesn’t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn’t it right ? And question still remains, how to keep track of 
those failed batches ?

From: amit assudani <aassud...@impetus.com<mailto:aassud...@impetus.com>>
Date: Friday, June 26, 2015 at 11:21 AM
To: Cody Koeninger <c...@koeninger.org<mailto:c...@koeninger.org>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>, Tathagata Das 
<t...@databricks.com<mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

Thanks for quick response,

My question here is how do I know that the max retries are done ( because in my 
code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?

Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?

Regards,
Amit

From: Cody Koeninger <c...@koeninger.org<mailto:c...@koeninger.org>>
Date: Friday, June 26, 2015 at 11:16 AM
To: amit assudani <aassud...@impetus.com<mailto:aassud...@impetus.com>>
Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>, Tathagata Das 
<t...@databricks.com<mailto:t...@databricks.com>>
Subject: Re: How to recover in case user errors in streaming

If you're consistently throwing exceptions and thus failing tasks, once you 
reach max failures the whole stream will stop.

It's up to you to either catch those exceptions, or restart your stream 
appropriately once it stops.

Keep in mind that if you're relying on checkpoints, and fixing the error 
requires changing your code, you may not be able to recover the checkpoint.

On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani 
<aassud...@impetus.com<mailto:aassud...@impetus.com>> wrote:
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList("kafkaTopic1"));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream<String, String> messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);

JavaDStream<String> inputStream = messages
       .map(newFunction<Tuple2<String, String>, String>() {
       @Override
       public String call(Tuple2<String, String> tuple2) {
              returntuple2._2();
       }});

inputStream.foreachRDD(newFunction<JavaRDD<String>, Void>() {

       @Override
       public Void call(JavaRDD<String> rdd)throws Exception {
              if(!rdd.isEmpty())
              {
rdd.foreach(newVoidFunction<String>(){
@Override
                      publicvoid call(String arg0)throws Exception {
System.out.println("------------------------rdd----------"+arg0);
Thread.sleep(1000);

thrownew Exception(" :::::::::::::::user and/or service 
exception::::::::::::::"+arg0);

                      }});

              }
              returnnull;
       }
});

Detailed Description: Using spark streaming I read the text messages from kafka 
using direct API. For sake of simplicity, all I do in processing is printing 
each message on console and sleep of 1 sec. as a placeholder for actual 
processing. Assuming we get a user error may be due to bad record, format error 
or the service connectivity issues or let’s say the persistent store downtime. 
I’ve represented that with throwing an Exception from foreach block. I 
understand spark retries this configurable number of times and  proceeds ahead. 
The question is what happens to those failed messages, does ( if yes when ) 
spark re-tries those ? If not, does it have any callback method so as user can 
log / dump it in error queue and provision it for further analysis and / or 
retrials manually. Also, fyi, checkpoints are enabled and above code is in 
create context method to recover from spark driver / worker failures.

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.


---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

________________________________






NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

Reply via email to