No, if you have a bad message that you are continually throwing exceptions on, your stream will not progress to future batches.
On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani <aassud...@impetus.com> wrote: > Also, what I understand is, max failures doesn’t stop the entire stream, > it fails the job created for the specific batch, but the subsequent batches > still proceed, isn’t it right ? And question still remains, how to keep > track of those failed batches ? > > From: amit assudani <aassud...@impetus.com> > Date: Friday, June 26, 2015 at 11:21 AM > To: Cody Koeninger <c...@koeninger.org> > > Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das < > t...@databricks.com> > Subject: Re: How to recover in case user errors in streaming > > Thanks for quick response, > > My question here is how do I know that the max retries are done ( > because in my code I never know whether it is failure of first try or the > last try ) and I need to handle this message, is there any callback ? > > Also, I know the limitation of checkpoint in upgrading the code, but my > main focus here to mitigate the connectivity issues to persistent store > which gets resolved in a while, but how do I know which all messages failed > and need rework ? > > Regards, > Amit > > From: Cody Koeninger <c...@koeninger.org> > Date: Friday, June 26, 2015 at 11:16 AM > To: amit assudani <aassud...@impetus.com> > Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das < > t...@databricks.com> > Subject: Re: How to recover in case user errors in streaming > > If you're consistently throwing exceptions and thus failing tasks, once > you reach max failures the whole stream will stop. > > It's up to you to either catch those exceptions, or restart your stream > appropriately once it stops. > > Keep in mind that if you're relying on checkpoints, and fixing the error > requires changing your code, you may not be able to recover the checkpoint. > > On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani <aassud...@impetus.com> > wrote: > >> *Problem: *how do we recover from user errors (connectivity issues / >> storage service down / etc.)? >> >> *Environment:* Spark streaming using Kafka Direct Streams >> >> *Code Snippet: * >> >> >> >> HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*( >> "kafkaTopic1")); >> >> HashMap<String, String> kafkaParams = *new* HashMap<String, String>(); >> >> kafkaParams.put("metadata.broker.list", "localhost:9092"); >> >> kafkaParams.put("auto.offset.reset", "smallest"); >> >> >> >> >> >> JavaPairInputDStream<String, String> messages = KafkaUtils >> >> .*createDirectStream*(jssc, String.*class*, String.*class*, >> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); >> >> >> >> JavaDStream<String> inputStream = messages >> >> .map(*new**Function<Tuple2<String, String>, String>()* { >> >> @Override >> >> *public* String call(Tuple2<String, String> tuple2) { >> >> *return*tuple2._2(); >> >> }}); >> >> >> >> inputStream.foreachRDD(*new**Function<JavaRDD<String>, Void>()* { >> >> >> >> @Override >> >> *public* Void call(JavaRDD<String> rdd)*throws* Exception { >> >> *if*(!rdd.isEmpty()) >> >> { >> >> rdd.foreach(*new**VoidFunction<String>()*{ >> >> @Override >> >> *public**void* call(String arg0)*throws* Exception >> { >> >> System.*out*.println("------------------------rdd----------"+arg0); >> >> Thread.*sleep*(1000); >> >> >> >> *throw**new* Exception(" :::::::::::::::user and/or service >> exception::::::::::::::"+arg0); >> >> >> >> }}); >> >> >> >> } >> >> *return**null*; >> >> } >> >> }); >> >> >> >> *Detailed Description*: Using spark streaming I read the text messages >> from kafka using direct API. For sake of simplicity, all I do in processing >> is printing each message on console and sleep of 1 sec. as a placeholder >> for actual processing. Assuming we get a user error may be due to bad >> record, format error or the service connectivity issues or let’s say the >> persistent store downtime. I’ve represented that with throwing an Exception >> from foreach block. I understand spark retries this configurable number of >> times and proceeds ahead. The question is what happens to those failed >> messages, does ( if yes when ) spark re-tries those ? If not, does it have >> any callback method so as user can log / dump it in error queue and >> provision it for further analysis and / or retrials manually. Also, fyi, >> checkpoints are enabled and above code is in create context method to >> recover from spark driver / worker failures. >> >> ------------------------------ >> >> >> >> >> >> >> NOTE: This message may contain information that is confidential, >> proprietary, privileged or otherwise protected by law. The message is >> intended solely for the named addressee. If received in error, please >> destroy and notify the sender. Any use of this email is prohibited when >> received in error. Impetus does not represent, warrant and/or guarantee, >> that the integrity of this communication has been maintained nor that the >> communication is free of errors, virus, interception or interference. >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > > ------------------------------ > > > > > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. > > ------------------------------ > > > > > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. >