Recovery for Spark Streaming Kafka Direct with OffsetOutOfRangeException
Hey Cody, I would have responded to the mailing list but it looks like this thread got aged off. I have the problem where one of my topics dumps more data than my spark job can keep up with. We limit the input rate with maxRatePerPartition Eventually, when the data is aged off, I get the OffsetOutOfRangeException from Kafka, as we would expect. As we work towards more efficient processing of that topic, or get more resources, I'd like to be able to log the error and continue the application without failing. Is there a place where I can catch that error before it gets to org.apache.spark.util.Utils$.logUncaughtExceptions ? Maybe somewhere in DirectKafkaInputDStream::compute? https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAKWX9VUoNd4ATGF+0TkNJ+9=b8r2nr9pt7sbgr-bv4nnttk...@mail.gmail.com%3E -- Dan ✆
Re: Spark Stream Monitoring with Kafka Direct API
I'm on spark version 1.4.1. I couldn't find documentation that said it was fixed, so I thought maybe it was still an open issue. Any idea what the fix version is? On Wed, Dec 9, 2015 at 11:10 AM Cody Koeninger wrote: > Which version of spark are you on? I thought that was added to the spark > UI in recent versions. > > DIrect api doesn't have any inherent interaction with zookeeper. If you > need number of messages per batch and aren't on a recent enough version of > spark to see them in the ui, you can get them programatically from the > offset ranges. See the definition of count() in recent versions of > KafkaRDD for an example. > > On Wed, Dec 9, 2015 at 9:39 AM, Dan Dutrow wrote: > >> Is there documentation for how to update the metrics (#messages per >> batch) in the Spark Streaming tab when using the Direct API? Does the >> Streaming tab get its information from Zookeeper or something else >> internally? >> -- >> Dan ✆ >> > > -- Dan ✆
Spark Stream Monitoring with Kafka Direct API
Is there documentation for how to update the metrics (#messages per batch) in the Spark Streaming tab when using the Direct API? Does the Streaming tab get its information from Zookeeper or something else internally? -- Dan ✆
Re: Kafka - streaming from multiple topics
Hey Cody, I'm convinced that I'm not going to get the functionality I want without using the Direct Stream API. I'm now looking through https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#exactly-once-using-transactional-writes where you say "For the very first time the job is run, the table can be pre-loaded with appropriate starting offsets." Could you provide some guidance on how to determine valid starting offsets the very first time, particularly in my case where I have 10+ topics in multiple different deployment environments with an unknown and potentially dynamic number of partitions per topic per environment? I'd be happy if I could initialize all consumers to the value of *auto.offset.reset = "largest"*, record the partitions and offsets as they flow through spark, and then use those discovered offsets from thereon out. I'm thinking I can probably just do some if/else logic and use the basic createDirectStream and the more advanced createDirectStream(...fromOffsets...) if the offsets for my topic name exists in the database. Any reason that wouldn't work? If I don't include an offset range for a particular partition, will that partition be ignored? On Wed, Dec 2, 2015 at 3:17 PM Cody Koeninger wrote: > Use the direct stream. You can put multiple topics in a single stream, > and differentiate them on a per-partition basis using the offset range. > > On Wed, Dec 2, 2015 at 2:13 PM, dutrow wrote: > >> I found the JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2388 >> >> It was marked as invalid. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25550.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> -- Dan ✆
Re: Kafka - streaming from multiple topics
Sigh... I want to use the direct stream and have recently brought in Redis to persist the offsets, but I really like and need to have realtime metrics on the GUI, so I'm hoping to have Direct and Receiver stream both working. On Wed, Dec 2, 2015 at 3:17 PM Cody Koeninger wrote: > Use the direct stream. You can put multiple topics in a single stream, > and differentiate them on a per-partition basis using the offset range. > > On Wed, Dec 2, 2015 at 2:13 PM, dutrow wrote: > >> I found the JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2388 >> >> It was marked as invalid. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25550.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> -- Dan ✆
Re: Different Kafka createDirectStream implementations
I see... the first method takes the offsets as it's third parameter while the second method just takes topic names and that's the primary reason why the implementations are different. In that case, what I am noticing is that setting the messageHandler is unavailable in the second method. This isn't a killer for me, but maybe someone else would want to set that. On Tue, Sep 8, 2015 at 2:32 PM Cody Koeninger wrote: > What exactly do you think should be done with auto offset reset if someone > has explicitly provided offsets? > > auto offset reset is only useful for determining whether to start the > stream at the beginning or the end of the log... if someone's provided > explicit offsets, it's pretty clear where to start the stream. > > On Tue, Sep 8, 2015 at 1:19 PM, Dan Dutrow wrote: > >> Yes, but one implementation checks those flags and the other one doesn't. >> I would think they should be consistent. >> >> On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger wrote: >> >>> If you're providing starting offsets explicitly, then auto offset reset >>> isn't relevant. >>> >>> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow >>> wrote: >>> >>>> The two methods of createDirectStream appear to have different >>>> implementations, the second checks the offset.reset flags and does some >>>> error handling while the first does not. Besides the use of a >>>> messageHandler, are they intended to be used in different situations? >>>> >>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: >>>> ClassTag, >>>> VD <: Decoder[V]: ClassTag,* R: ClassTag] * >>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: >>>> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, >>>> V] => R * >>>> ): >>>> >>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: >>>> ClassTag, VD <: Decoder[V]: ClassTag] >>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: >>>> Set[String] ) >>>> -- >>>> Dan📱 >>> >>> >>> -- >> Dan📱 > > > -- Dan📱
Re: Different Kafka createDirectStream implementations
Yes, but one implementation checks those flags and the other one doesn't. I would think they should be consistent. On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger wrote: > If you're providing starting offsets explicitly, then auto offset reset > isn't relevant. > > On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow wrote: > >> The two methods of createDirectStream appear to have different >> implementations, the second checks the offset.reset flags and does some >> error handling while the first does not. Besides the use of a >> messageHandler, are they intended to be used in different situations? >> >> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: >> ClassTag, >> VD <: Decoder[V]: ClassTag,* R: ClassTag] * >> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: >> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V] >> => R * >> ): >> >> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: >> ClassTag, VD <: Decoder[V]: ClassTag] >> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: >> Set[String] ) >> -- >> Dan📱 > > > -- Dan📱
Different Kafka createDirectStream implementations
The two methods of createDirectStream appear to have different implementations, the second checks the offset.reset flags and does some error handling while the first does not. Besides the use of a messageHandler, are they intended to be used in different situations? def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag,* R: ClassTag] * ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V] => R * ): def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String] ) -- Dan📱
Re: Maintaining Kafka Direct API Offsets
Thanks. Looking at the KafkaCluster.scala code, ( https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L253), it seems a little hacky for me to alter and recompile spark to expose those methods, so I'll use the receiver API for the time being and watch for changes as this API evolves to make those methods a little bit more accessible. Meanwhile, I'll look into incorporating a database, like maybe Tachyon, to persist offset and state data across redeployments. On Fri, Aug 14, 2015 at 3:21 PM Cody Koeninger wrote: > I don't entirely agree with that assessment. Not paying for extra cores > to run receivers was about as important as delivery semantics, as far as > motivations for the api. > > As I said in the jira tickets on the topic, if you want to use the direct > api and save offsets to ZK, you can. The right way to make that easier is > to expose the (currently private) methods that already exist in > KafkaCluster.scala for committing offsets through Kafka's api. I don't > think adding another "do the wrong thing" option is beneficial. > > On Fri, Aug 14, 2015 at 11:34 AM, dutrow wrote: > >> In summary, it appears that the use of the DirectAPI was intended >> specifically to enable exactly-once semantics. This can be achieved for >> idempotent transformations and with transactional processing using the >> database to guarantee an "onto" mapping of results based on inputs. For >> the >> latter, you need to store your offsets in the database of record. >> >> If you as a developer do not necessarily need exactly-once semantics, then >> you can probably get by fine using the receiver API. >> >> The hope is that one day the Direct API could be augmented with >> Spark-abstracted offset storage (with zookeeper, kafka, or something else >> outside of the Spark checkpoint), since this would allow developers to >> easily take advantage of the Direct API's performance benefits and >> simplification of parallelism. I think it would be worth adding, even if >> it >> were to come with some "buyer beware" caveats. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24273.html > > >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>