Hi, TD

In my code, 
I write like this:


dstream.foreachRDD { rdd => 


   try {
      
   } catch {


   }
}


it will still throw exception, and the driver will be killed...


I need to catch exception in rdd.foreachPartition
just like these, so I need to retry by myself ...... 
dstream.foreachRDD { rdd => 


   try {
         rdd.foreachPartition{ record => 
             try {
             } catch {
                case Exception =>
             } 
   } catch {


   }
}














------------------ ???????? ------------------
??????: "Tathagata Das";<t...@databricks.com>;
????????: 2015??6??30??(??????) ????5:24
??????: "Amit Assudani"<aassud...@impetus.com>; 
????: "Cody Koeninger"<c...@koeninger.org>; 
"user@spark.apache.org"<user@spark.apache.org>; 
????: Re: How to recover in case user errors in streaming



I recommend writing using dstream.foreachRDD, and then 
rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of 
dstream.saveAsNewAPIHadoopFiles

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716



On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani <aassud...@impetus.com> wrote:
  Also, how do you suggest catching exceptions while using with connector API 
like, saveAsNewAPIHadoopFiles ? 
 
 
   From: amit assudani <aassud...@impetus.com>
 Date: Monday, June 29, 2015 at 9:55 AM
 To: Tathagata Das <t...@databricks.com>
 Cc: Cody Koeninger <c...@koeninger.org>, "user@spark.apache.org" 
<user@spark.apache.org>
 Subject: Re: How to recover in case user errors in streaming
 


 
 
   Thanks TD, this helps. 
 
 
 Looking forward to some fix where framework handles the batch failures by some 
callback methods. This will help not having to write try/catch in every 
transformation / action. 
 
 
 Regards,
 Amit
 
 
   From: Tathagata Das <t...@databricks.com>
 Date: Saturday, June 27, 2015 at 5:14 AM
 To: amit assudani <aassud...@impetus.com>
 Cc: Cody Koeninger <c...@koeninger.org>, "user@spark.apache.org" 
<user@spark.apache.org>
 Subject: Re: How to recover in case user errors in streaming
 
 
 
   I looked at the code and found that batch exceptions are indeed ignored. 
This is something that is worth fixing, that batch exceptions should not be 
silently ignored.  
 
 Also, you can catch failed batch jobs (irrespective of the number of retries) 
by catch the exception in foreachRDD. Here is an example.
 
 
 dstream.foreachRDD { rdd => 
 
 
    try {
       
    } catch {
 
 
    }
 }
 
 
 
 
 This will catch failures at the granularity of the job, after all the max 
retries of a task has been done. But it will be hard to filter and find the 
push the failed record(s) somewhere. To do that, I would do use rdd.foreach or 
rdd.foreachPartition, inside  which I would catch the exception and push that 
record out to another Kafka topic, and continue normal processing of other 
records. This would prevent the task process the partition from failing (as you 
are catching the bad records). 
 
 
 dstream.foreachRDD {  rdd =>
 
 
     rdd.foreachPartition { iterator => 
         
          // Create Kafka producer for bad records 
 
 
         iterator.foreach { record => 
              try {
                  // process record
              } catch {
                 case ExpectedException =>
                     // publish bad record to error topic in Kafka using above 
producer
              } 
         }
     }
 }
 
 
 
 
 TD
 
 
 
 PS: Apologies for the Scala examples, hope you get the idea :)
 
 
 On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani  <aassud...@impetus.com> wrote:
   Also, I get TaskContext.get() null when used in foreach function below ( I 
get it when I use it in map, but the whole point here is to handle something 
that is breaking in action ). Please help. :(
 
 
   From: amit assudani <aassud...@impetus.com>
 Date: Friday, June 26, 2015 at 11:41 AM  
 To: Cody Koeninger <c...@koeninger.org>
 Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das 
<t...@databricks.com>
 Subject: Re: How to recover in case user errors in streaming
 
 
 
   
 
   Hmm, not sure why, but when I run this code, it always keeps on consuming 
from Kafka and proceeds ignoring the previous failed batches, 
 
 
 Also, Now that I get the attempt number from TaskContext and I have 
information of max retries, I am supposed to handle it in the try/catch block, 
but does it mean I??ve to handle these kind of exceptions / errors in every 
transformation step ( map, reduce,  transform, etc. ), isn??t there any 
callback where it says it has been retried max number of times and before being 
ignored you??ve a handle to do whatever you want to do with the batch / message 
in hand. 
 
 
 Regards,
 Amit
 
 
   From: Cody Koeninger <c...@koeninger.org>
 Date: Friday, June 26, 2015 at 11:32 AM
 To: amit assudani <aassud...@impetus.com>
 Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das 
<t...@databricks.com>
 Subject: Re: How to recover in case user errors in streaming
 
 
 
   No, if you have a bad message that you are continually throwing exceptions 
on, your stream will not progress to future batches.
 
 On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani  <aassud...@impetus.com> wrote:
   Also, what I understand is, max failures doesn??t stop the entire stream, it 
fails the job created for the specific batch, but the subsequent batches still 
proceed, isn??t it right ? And question still remains, how to keep track of 
those failed batches ? 
 
 
   From: amit assudani <aassud...@impetus.com>
 Date: Friday, June 26, 2015 at 11:21 AM
 To: Cody Koeninger <c...@koeninger.org>  
 Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das 
<t...@databricks.com>
 Subject: Re: How to recover in case user errors in streaming
 
 
 
   
 
   Thanks for quick response,
 
 
 My question here is how do I know that the max retries are done ( because in 
my code I never know whether it is failure of first try or the last try ) and I 
need to handle this message, is there any callback ?
 
 
 Also, I know the limitation of checkpoint in upgrading the code, but my main 
focus here to mitigate the connectivity issues to persistent store which gets 
resolved in a while, but how do I know which all messages failed and need 
rework ?
 
 
 Regards,
 Amit
 
 
   From: Cody Koeninger <c...@koeninger.org>
 Date: Friday, June 26, 2015 at 11:16 AM
 To: amit assudani <aassud...@impetus.com>
 Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das 
<t...@databricks.com>
 Subject: Re: How to recover in case user errors in streaming
 
 
 
   If you're consistently throwing exceptions and thus failing tasks, once you 
reach max failures the whole stream will stop. 
 
 It's up to you to either catch those exceptions, or restart your stream 
appropriately once it stops.
 
 
 Keep in mind that if you're relying on checkpoints, and fixing the error 
requires changing your code, you may not be able to recover the checkpoint.
 
 
 On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani  <aassud...@impetus.com> wrote:
     
Problem: how do we recover from user errors (connectivity issues / storage 
service down / etc.)?
 
Environment: Spark streaming using Kafka Direct Streams
 
Code Snippet: 
 
 
 
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList("kafkaTopic1"));
 
HashMap<String, String> kafkaParams = new HashMap<String, String>();
 
kafkaParams.put("metadata.broker.list", "localhost:9092");
 
kafkaParams.put("auto.offset.reset", "smallest");
 
 
 
              
 
JavaPairInputDStream<String, String> messages = KafkaUtils
 
.createDirectStream(jssc,  String.class, String.class,  StringDecoder.class, 
StringDecoder.class, kafkaParams, topicsSet);
 
              
 
JavaDStream<String> inputStream = messages
 
       .map(newFunction<Tuple2<String, String>, String>()  {
 
        @Override
 
        public String call(Tuple2<String, String> tuple2) {
 
              returntuple2._2();
 
       }});
 
              
 
inputStream.foreachRDD(newFunction<JavaRDD<String>,  Void>() {
 
 
 
        @Override
 
        public Void call(JavaRDD<String> rdd)throws Exception {
 
              if(!rdd.isEmpty())
 
              {
 
 rdd.foreach(newVoidFunction<String>(){
 
 @Override
 
                      publicvoid  call(String arg0)throws Exception  {
 
System.out.println("------------------------rdd----------"+arg0);
 
 Thread.sleep(1000);
 
                                           
 
thrownew  Exception(" :::::::::::::::user and/or service 
exception::::::::::::::"+arg0);
 
                                           
 
                      }});
 
                             
 
              }
 
              returnnull;
 
       }
 
});
 
 
 
Detailed Description: Using spark streaming I read the text messages from kafka 
using direct API. For sake of simplicity, all I do in processing is printing 
each message on console and sleep of 1 sec. as a placeholder for actual  
processing. Assuming we get a user error may be due to bad record, format error 
or the service connectivity issues or let??s say the persistent store downtime. 
I??ve represented that with throwing an Exception from foreach block. I 
understand spark retries this  configurable number of times and  proceeds 
ahead. The question is what happens to those failed messages, does ( if yes 
when ) spark re-tries those ? If not, does it have any callback method so as 
user can log / dump it in error queue and provision it for further  analysis 
and / or retrials manually. Also, fyi, checkpoints are enabled and above code 
is in create context method to recover from spark driver / worker failures. 
 
 
 
 
 
 
 
 
 
 
 NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email  is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
 
 
 
 ---------------------------------------------------------------------
 To unsubscribe, e-mail:  user-unsubscr...@spark.apache.org
 For additional commands, e-mail:  user-h...@spark.apache.org
  
 
 
 
 
 
 
 
 
 
 
 
 
 NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email  is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
 
 
 
 
   
 
 
 
 
 
 
 
 NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email  is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
 
 
 
  
 
 
 
 
 
 
 
 
 
 
 
 
 NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email  is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
 
 
 
 
   
 
 
 
 
 
 
 
 NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email  is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.
 
 
 
  
 
 
 
 
 
 
 


 
 
 
 
 
 
 
 NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email  is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the communication is free of errors, 
virus, interception or interference.

Reply via email to