I recommend writing using dstream.foreachRDD, and then rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of dstream.saveAsNewAPIHadoopFiles
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716 On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani <aassud...@impetus.com> wrote: > Also, how do you suggest catching exceptions while using with connector > API like, saveAsNewAPIHadoopFiles ? > > From: amit assudani <aassud...@impetus.com> > Date: Monday, June 29, 2015 at 9:55 AM > To: Tathagata Das <t...@databricks.com> > > Cc: Cody Koeninger <c...@koeninger.org>, "user@spark.apache.org" < > user@spark.apache.org> > Subject: Re: How to recover in case user errors in streaming > > Thanks TD, this helps. > > Looking forward to some fix where framework handles the batch failures > by some callback methods. This will help not having to write try/catch in > every transformation / action. > > Regards, > Amit > > From: Tathagata Das <t...@databricks.com> > Date: Saturday, June 27, 2015 at 5:14 AM > To: amit assudani <aassud...@impetus.com> > Cc: Cody Koeninger <c...@koeninger.org>, "user@spark.apache.org" < > user@spark.apache.org> > Subject: Re: How to recover in case user errors in streaming > > I looked at the code and found that batch exceptions are indeed > ignored. This is something that is worth fixing, that batch exceptions > should not be silently ignored. > > Also, you can catch failed batch jobs (irrespective of the number of > retries) by catch the exception in foreachRDD. Here is an example. > > dstream.foreachRDD { rdd => > > try { > > } catch { > > } > } > > > This will catch failures at the granularity of the job, after all the > max retries of a task has been done. But it will be hard to filter and find > the push the failed record(s) somewhere. To do that, I would do use > rdd.foreach or rdd.foreachPartition, inside which I would catch the > exception and push that record out to another Kafka topic, and continue > normal processing of other records. This would prevent the task process the > partition from failing (as you are catching the bad records). > > dstream.foreachRDD { rdd => > > rdd.foreachPartition { iterator => > > // Create Kafka producer for bad records > > iterator.foreach { record => > try { > // process record > } catch { > case ExpectedException => > // publish bad record to error topic in Kafka using > above producer > } > } > } > } > > > TD > > PS: Apologies for the Scala examples, hope you get the idea :) > > On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani <aassud...@impetus.com> > wrote: > >> Also, I get TaskContext.get() null when used in foreach function below >> ( I get it when I use it in map, but the whole point here is to handle >> something that is breaking in action ). Please help. :( >> >> From: amit assudani <aassud...@impetus.com> >> Date: Friday, June 26, 2015 at 11:41 AM >> >> To: Cody Koeninger <c...@koeninger.org> >> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das < >> t...@databricks.com> >> Subject: Re: How to recover in case user errors in streaming >> >> Hmm, not sure why, but when I run this code, it always keeps on >> consuming from Kafka and proceeds ignoring the previous failed batches, >> >> Also, Now that I get the attempt number from TaskContext and I have >> information of max retries, I am supposed to handle it in the try/catch >> block, but does it mean I’ve to handle these kind of exceptions / errors in >> every transformation step ( map, reduce, transform, etc. ), isn’t there any >> callback where it says it has been retried max number of times and before >> being ignored you’ve a handle to do whatever you want to do with the batch >> / message in hand. >> >> Regards, >> Amit >> >> From: Cody Koeninger <c...@koeninger.org> >> Date: Friday, June 26, 2015 at 11:32 AM >> To: amit assudani <aassud...@impetus.com> >> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das < >> t...@databricks.com> >> Subject: Re: How to recover in case user errors in streaming >> >> No, if you have a bad message that you are continually throwing >> exceptions on, your stream will not progress to future batches. >> >> On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani <aassud...@impetus.com> >> wrote: >> >>> Also, what I understand is, max failures doesn’t stop the entire >>> stream, it fails the job created for the specific batch, but the subsequent >>> batches still proceed, isn’t it right ? And question still remains, how to >>> keep track of those failed batches ? >>> >>> From: amit assudani <aassud...@impetus.com> >>> Date: Friday, June 26, 2015 at 11:21 AM >>> To: Cody Koeninger <c...@koeninger.org> >>> >>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das < >>> t...@databricks.com> >>> Subject: Re: How to recover in case user errors in streaming >>> >>> Thanks for quick response, >>> >>> My question here is how do I know that the max retries are done ( >>> because in my code I never know whether it is failure of first try or the >>> last try ) and I need to handle this message, is there any callback ? >>> >>> Also, I know the limitation of checkpoint in upgrading the code, but >>> my main focus here to mitigate the connectivity issues to persistent store >>> which gets resolved in a while, but how do I know which all messages failed >>> and need rework ? >>> >>> Regards, >>> Amit >>> >>> From: Cody Koeninger <c...@koeninger.org> >>> Date: Friday, June 26, 2015 at 11:16 AM >>> To: amit assudani <aassud...@impetus.com> >>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das < >>> t...@databricks.com> >>> Subject: Re: How to recover in case user errors in streaming >>> >>> If you're consistently throwing exceptions and thus failing tasks, >>> once you reach max failures the whole stream will stop. >>> >>> It's up to you to either catch those exceptions, or restart your >>> stream appropriately once it stops. >>> >>> Keep in mind that if you're relying on checkpoints, and fixing the >>> error requires changing your code, you may not be able to recover the >>> checkpoint. >>> >>> On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani <aassud...@impetus.com> >>> wrote: >>> >>>> *Problem: *how do we recover from user errors (connectivity issues / >>>> storage service down / etc.)? >>>> >>>> *Environment:* Spark streaming using Kafka Direct Streams >>>> >>>> *Code Snippet: * >>>> >>>> >>>> >>>> HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*( >>>> "kafkaTopic1")); >>>> >>>> HashMap<String, String> kafkaParams = *new* HashMap<String, String>(); >>>> >>>> kafkaParams.put("metadata.broker.list", "localhost:9092"); >>>> >>>> kafkaParams.put("auto.offset.reset", "smallest"); >>>> >>>> >>>> >>>> >>>> >>>> JavaPairInputDStream<String, String> messages = KafkaUtils >>>> >>>> .*createDirectStream*(jssc, String.*class*, String.*class*, >>>> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); >>>> >>>> >>>> >>>> JavaDStream<String> inputStream = messages >>>> >>>> .map(*new**Function<Tuple2<String, String>, String>()* { >>>> >>>> @Override >>>> >>>> *public* String call(Tuple2<String, String> tuple2) { >>>> >>>> *return*tuple2._2(); >>>> >>>> }}); >>>> >>>> >>>> >>>> inputStream.foreachRDD(*new**Function<JavaRDD<String>, Void>()* { >>>> >>>> >>>> >>>> @Override >>>> >>>> *public* Void call(JavaRDD<String> rdd)*throws* Exception { >>>> >>>> *if*(!rdd.isEmpty()) >>>> >>>> { >>>> >>>> rdd.foreach(*new**VoidFunction<String>()*{ >>>> >>>> @Override >>>> >>>> *public**void* call(String arg0)*throws* >>>> Exception { >>>> >>>> System.*out*.println("------------------------rdd----------"+arg0); >>>> >>>> Thread.*sleep*(1000); >>>> >>>> >>>> >>>> *throw**new* Exception(" :::::::::::::::user and/or service >>>> exception::::::::::::::"+arg0); >>>> >>>> >>>> >>>> }}); >>>> >>>> >>>> >>>> } >>>> >>>> *return**null*; >>>> >>>> } >>>> >>>> }); >>>> >>>> >>>> >>>> *Detailed Description*: Using spark streaming I read the text messages >>>> from kafka using direct API. For sake of simplicity, all I do in processing >>>> is printing each message on console and sleep of 1 sec. as a placeholder >>>> for actual processing. Assuming we get a user error may be due to bad >>>> record, format error or the service connectivity issues or let’s say the >>>> persistent store downtime. I’ve represented that with throwing an Exception >>>> from foreach block. I understand spark retries this configurable number of >>>> times and proceeds ahead. The question is what happens to those failed >>>> messages, does ( if yes when ) spark re-tries those ? If not, does it have >>>> any callback method so as user can log / dump it in error queue and >>>> provision it for further analysis and / or retrials manually. Also, fyi, >>>> checkpoints are enabled and above code is in create context method to >>>> recover from spark driver / worker failures. >>>> >>>> ------------------------------ >>>> >>>> >>>> >>>> >>>> >>>> >>>> NOTE: This message may contain information that is confidential, >>>> proprietary, privileged or otherwise protected by law. The message is >>>> intended solely for the named addressee. If received in error, please >>>> destroy and notify the sender. Any use of this email is prohibited when >>>> received in error. Impetus does not represent, warrant and/or guarantee, >>>> that the integrity of this communication has been maintained nor that the >>>> communication is free of errors, virus, interception or interference. >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >>> >>> ------------------------------ >>> >>> >>> >>> >>> >>> >>> NOTE: This message may contain information that is confidential, >>> proprietary, privileged or otherwise protected by law. The message is >>> intended solely for the named addressee. If received in error, please >>> destroy and notify the sender. Any use of this email is prohibited when >>> received in error. Impetus does not represent, warrant and/or guarantee, >>> that the integrity of this communication has been maintained nor that the >>> communication is free of errors, virus, interception or interference. >>> >>> ------------------------------ >>> >>> >>> >>> >>> >>> >>> NOTE: This message may contain information that is confidential, >>> proprietary, privileged or otherwise protected by law. The message is >>> intended solely for the named addressee. If received in error, please >>> destroy and notify the sender. Any use of this email is prohibited when >>> received in error. Impetus does not represent, warrant and/or guarantee, >>> that the integrity of this communication has been maintained nor that the >>> communication is free of errors, virus, interception or interference. >>> >> >> >> ------------------------------ >> >> >> >> >> >> >> NOTE: This message may contain information that is confidential, >> proprietary, privileged or otherwise protected by law. The message is >> intended solely for the named addressee. If received in error, please >> destroy and notify the sender. Any use of this email is prohibited when >> received in error. Impetus does not represent, warrant and/or guarantee, >> that the integrity of this communication has been maintained nor that the >> communication is free of errors, virus, interception or interference. >> >> ------------------------------ >> >> >> >> >> >> >> NOTE: This message may contain information that is confidential, >> proprietary, privileged or otherwise protected by law. The message is >> intended solely for the named addressee. If received in error, please >> destroy and notify the sender. Any use of this email is prohibited when >> received in error. Impetus does not represent, warrant and/or guarantee, >> that the integrity of this communication has been maintained nor that the >> communication is free of errors, virus, interception or interference. >> > > > ------------------------------ > > > > > > > NOTE: This message may contain information that is confidential, > proprietary, privileged or otherwise protected by law. The message is > intended solely for the named addressee. If received in error, please > destroy and notify the sender. Any use of this email is prohibited when > received in error. Impetus does not represent, warrant and/or guarantee, > that the integrity of this communication has been maintained nor that the > communication is free of errors, virus, interception or interference. >