Recovery for Spark Streaming Kafka Direct with OffsetOutOfRangeException

2016-01-21 Thread Dan Dutrow
Hey Cody, I would have responded to the mailing list but it looks like this
thread got aged off. I have the problem where one of my topics dumps more
data than my spark job can keep up with. We limit the input rate with
maxRatePerPartition Eventually, when the data is aged off, I get the
OffsetOutOfRangeException from Kafka, as we would expect. As we work
towards more efficient processing of that topic, or get more resources, I'd
like to be able to log the error and continue the application without
failing. Is there a place where I can catch that error before it gets to
org.apache.spark.util.Utils$.logUncaughtExceptions ? Maybe somewhere in
DirectKafkaInputDStream::compute?

https://mail-archives.apache.org/mod_mbox/spark-user/201512.mbox/%3CCAKWX9VUoNd4ATGF+0TkNJ+9=b8r2nr9pt7sbgr-bv4nnttk...@mail.gmail.com%3E
-- 
Dan ✆


Re: Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Dan Dutrow
I'm on spark version 1.4.1. I couldn't find documentation that said it was
fixed, so I thought maybe it was still an open issue. Any idea what the fix
version is?

On Wed, Dec 9, 2015 at 11:10 AM Cody Koeninger  wrote:

> Which version of spark are you on?  I thought that was added to the spark
> UI in recent versions.
>
> DIrect api doesn't have any inherent interaction with zookeeper.  If you
> need number of messages per batch and aren't on a recent enough version of
> spark to see them in the ui, you can get them programatically from the
> offset ranges.  See the definition of count() in recent versions of
> KafkaRDD for an example.
>
> On Wed, Dec 9, 2015 at 9:39 AM, Dan Dutrow  wrote:
>
>> Is there documentation for how to update the metrics (#messages per
>> batch) in the Spark Streaming tab when using the Direct API? Does the
>> Streaming tab get its information from Zookeeper or something else
>> internally?
>> --
>> Dan ✆
>>
>
> --
Dan ✆


Spark Stream Monitoring with Kafka Direct API

2015-12-09 Thread Dan Dutrow
Is there documentation for how to update the metrics (#messages per batch)
in the Spark Streaming tab when using the Direct API? Does the Streaming
tab get its information from Zookeeper or something else internally?
-- 
Dan ✆


Re: Kafka - streaming from multiple topics

2015-12-03 Thread Dan Dutrow
Hey Cody, I'm convinced that I'm not going to get the functionality I want
without using the Direct Stream API.

I'm now looking through
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md#exactly-once-using-transactional-writes
where you say "For the very first time the job is run, the table can be
pre-loaded with appropriate starting offsets."

Could you provide some guidance on how to determine valid starting offsets
the very first time, particularly in my case where I have 10+ topics in
multiple different deployment environments with an unknown and potentially
dynamic number of partitions per topic per environment?

I'd be happy if I could initialize all consumers to the value of
*auto.offset.reset
= "largest"*, record the partitions and offsets as they flow through spark,
and then use those discovered offsets from thereon out.

I'm thinking I can probably just do some if/else logic and use the basic
createDirectStream and the more advanced
createDirectStream(...fromOffsets...) if the offsets for my topic name
exists in the database. Any reason that wouldn't work? If I don't include
an offset range for a particular partition, will that partition be ignored?




On Wed, Dec 2, 2015 at 3:17 PM Cody Koeninger  wrote:

> Use the direct stream.  You can put multiple topics in a single stream,
> and differentiate them on a per-partition basis using the offset range.
>
> On Wed, Dec 2, 2015 at 2:13 PM, dutrow  wrote:
>
>> I found the JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2388
>>
>> It was marked as invalid.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25550.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>> --
Dan ✆


Re: Kafka - streaming from multiple topics

2015-12-02 Thread Dan Dutrow
Sigh... I want to use the direct stream and have recently brought in Redis
to persist the offsets, but I really like and need to have realtime metrics
on the GUI, so I'm hoping to have Direct and Receiver stream both working.

On Wed, Dec 2, 2015 at 3:17 PM Cody Koeninger  wrote:

> Use the direct stream.  You can put multiple topics in a single stream,
> and differentiate them on a per-partition basis using the offset range.
>
> On Wed, Dec 2, 2015 at 2:13 PM, dutrow  wrote:
>
>> I found the JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2388
>>
>> It was marked as invalid.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25550.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>> --
Dan ✆


Re: Different Kafka createDirectStream implementations

2015-09-08 Thread Dan Dutrow
I see... the first method takes the offsets as it's third parameter while
the second method just takes topic names and that's the primary reason why
the implementations are different.

In that case, what I am noticing is that setting the messageHandler is
unavailable in the second method. This isn't a killer for me, but maybe
someone else would want to set that.

On Tue, Sep 8, 2015 at 2:32 PM Cody Koeninger  wrote:

> What exactly do you think should be done with auto offset reset if someone
> has explicitly provided offsets?
>
> auto offset reset is only useful for determining whether to start the
> stream at the beginning or the end of the log... if someone's provided
> explicit offsets, it's pretty clear where to start the stream.
>
> On Tue, Sep 8, 2015 at 1:19 PM, Dan Dutrow  wrote:
>
>> Yes, but one implementation checks those flags and the other one doesn't.
>> I would think they should be consistent.
>>
>> On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger  wrote:
>>
>>> If you're providing starting offsets explicitly, then auto offset reset
>>> isn't relevant.
>>>
>>> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow 
>>> wrote:
>>>
>>>> The two methods of createDirectStream appear to have different
>>>> implementations, the second checks the offset.reset flags and does some
>>>> error handling while the first does not. Besides the use of a
>>>> messageHandler, are they intended to be used in different situations?
>>>>
>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>> ClassTag,
>>>> VD <: Decoder[V]: ClassTag,* R: ClassTag] *
>>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
>>>> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K,
>>>> V] => R *
>>>> ):
>>>>
>>>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>>>> ClassTag, VD <: Decoder[V]: ClassTag]
>>>> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
>>>> Set[String] )
>>>> --
>>>> Dan📱
>>>
>>>
>>> --
>> Dan📱
>
>
> --
Dan📱


Re: Different Kafka createDirectStream implementations

2015-09-08 Thread Dan Dutrow
Yes, but one implementation checks those flags and the other one doesn't. I
would think they should be consistent.

On Tue, Sep 8, 2015 at 1:32 PM Cody Koeninger  wrote:

> If you're providing starting offsets explicitly, then auto offset reset
> isn't relevant.
>
> On Tue, Sep 8, 2015 at 11:44 AM, Dan Dutrow  wrote:
>
>> The two methods of createDirectStream appear to have different
>> implementations, the second checks the offset.reset flags and does some
>> error handling while the first does not. Besides the use of a
>> messageHandler, are they intended to be used in different situations?
>>
>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>> ClassTag,
>> VD <: Decoder[V]: ClassTag,* R: ClassTag] *
>> ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
>> Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V]
>> => R *
>> ):
>>
>> def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
>> ClassTag, VD <: Decoder[V]: ClassTag]
>> ( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
>> Set[String] )
>> --
>> Dan📱
>
>
> --
Dan📱


Different Kafka createDirectStream implementations

2015-09-08 Thread Dan Dutrow
The two methods of createDirectStream appear to have different
implementations, the second checks the offset.reset flags and does some
error handling while the first does not. Besides the use of a
messageHandler, are they intended to be used in different situations?

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
ClassTag,
VD <: Decoder[V]: ClassTag,* R: ClassTag] *
( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets:
Map[TopicAndPartition, Long], * messageHandler: MessageAndMetadata[K, V] =>
R *
):

def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]:
ClassTag, VD <: Decoder[V]: ClassTag]
( ssc: StreamingContext, kafkaParams: Map[String, String], topics:
Set[String] )
-- 
Dan📱


Re: Maintaining Kafka Direct API Offsets

2015-08-14 Thread Dan Dutrow
Thanks. Looking at the KafkaCluster.scala code, (
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala#L253),
it seems a little hacky for me to alter and recompile spark to expose those
methods, so I'll use the receiver API for the time being and watch for
changes as this API evolves to make those methods a little bit more
accessible. Meanwhile, I'll look into incorporating a database, like maybe
Tachyon, to persist offset and state data across redeployments.

On Fri, Aug 14, 2015 at 3:21 PM Cody Koeninger  wrote:

> I don't entirely agree with that assessment.  Not paying for extra cores
> to run receivers was about as important as delivery semantics, as far as
> motivations for the api.
>
> As I said in the jira tickets on the topic, if you want to use the direct
> api and save offsets to ZK, you can.   The right way to make that easier is
> to expose the (currently private) methods that already exist in
> KafkaCluster.scala for committing offsets through Kafka's api.  I don't
> think adding another "do the wrong thing" option is beneficial.
>
> On Fri, Aug 14, 2015 at 11:34 AM, dutrow  wrote:
>
>> In summary, it appears that the use of the DirectAPI was intended
>> specifically to enable exactly-once semantics. This can be achieved for
>> idempotent transformations and with transactional processing using the
>> database to guarantee an "onto" mapping of results based on inputs. For
>> the
>> latter, you need to store your offsets in the database of record.
>>
>> If you as a developer do not necessarily need exactly-once semantics, then
>> you can probably get by fine using the receiver API.
>>
>> The hope is that one day the Direct API could be augmented with
>> Spark-abstracted offset storage (with zookeeper, kafka, or something else
>> outside of the Spark checkpoint), since this would allow developers to
>> easily take advantage of the Direct API's performance benefits and
>> simplification of parallelism. I think it would be worth adding, even if
>> it
>> were to come with some "buyer beware" caveats.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Maintaining-Kafka-Direct-API-Offsets-tp24246p24273.html
>
>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>