Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
I am doing that already for all known messy data. Thanks Cody for all your
time and input

On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger  wrote:

> Yes
>
> On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
>  wrote:
> > Thanks Cody again.
> >
> > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it
> in
> > the Cassandra table and saveToCassandra  is an action and my data do get
> > saved into Cassandra. It is working as expected 99% of the time except
> that
> > when there is an exception, I did not want the offsets to be committed.
> >
> > By Filtering for unsuccessful attempts, do you mean filtering the bad
> > records...
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger 
> wrote:
> >>
> >> If literally all you are doing is rdd.map I wouldn't expect
> >> saveToCassandra to happen at all, since map is not an action.
> >>
> >> Filtering for unsuccessful attempts and collecting those back to the
> >> driver would be one way for the driver to know whether it was safe to
> >> commit.
> >>
> >> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
> >>  wrote:
> >> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  -->
> is
> >> > running on executor
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
> is
> >> > running on driver.
> >> >
> >> > Is this the reason why kafka offsets are committed even when an
> >> > exception is
> >> > raised? If so is there a way to commit the offsets only when there are
> >> > no
> >> > exceptions?
> >> >
> >> >
> >> >
> >> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
> >> > 
> >> > wrote:
> >> >>
> >> >> Thanks again Cody,
> >> >>
> >> >> My understanding is all the code inside foreachRDD is running on the
> >> >> driver except for
> >> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >> >>
> >> >> When the exception is raised, I was thinking I won't be committing
> the
> >> >> offsets, but the offsets are committed all the time independent of
> >> >> whether
> >> >> an exception was raised or not.
> >> >>
> >> >> It will be helpful if you can explain this behavior.
> >> >>
> >> >>
> >> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
> >> >> wrote:
> >> >>>
> >> >>> I mean that the kafka consumers running on the executors should not
> be
> >> >>> automatically committing, because the fact that a message was read
> by
> >> >>> the consumer has no bearing on whether it was actually successfully
> >> >>> processed after reading.
> >> >>>
> >> >>> It sounds to me like you're confused about where code is running.
> >> >>> foreachRDD runs on the driver, not the executor.
> >> >>>
> >> >>>
> >> >>>
> >> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >> >>>
> >> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >> >>>  wrote:
> >> >>> > Thanks Cody for your response.
> >> >>> >
> >> >>> > All I want to do is, commit the offsets only if I am successfully
> >> >>> > able
> >> >>> > to
> >> >>> > write to cassandra database.
> >> >>> >
> >> >>> > The line //save the rdd to Cassandra database is
> >> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >> >>> >
> >> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> >> >>> > why
> >> >>> > it's
> >> >>> > being overridden. It is the executors that do the mapping and
> saving
> >> >>> > to
> >> >>> > cassandra. The status of success or failure of this operation is
> >> >>> > known
> >> >>> > only
> >> >>> > on the executor and thats where I want to commit the kafka
> offsets.
> >> >>> > If
> >> >>> > this
> >> >>> > is not what I sould be doing, then  what is the right way?
> >> >>> >
> >> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <
> c...@koeninger.org>
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> If your complaint is about offsets being committed that you
> didn't
> >> >>> >> expect... auto commit being false on executors shouldn't have
> >> >>> >> anything
> >> >>> >> to do with that.  Executors shouldn't be auto-committing, that's
> >> >>> >> why
> >> >>> >> it's being overridden.
> >> >>> >>
> >> >>> >> What you've said and the code you posted isn't really enough to
> >> >>> >> explain what your issue is, e.g.
> >> >>> >>
> >> >>> >> is this line
> >> >>> >> // save the rdd to Cassandra database
> >> >>> >> a blocking call
> >> >>> >>
> >> >>> >> are you sure that the rdd foreach isn't being retried and
> >> >>> >> succeeding
> >> >>> >> the second time around, etc
> >> >>> >>
> >> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >> >>> >>  wrote:
> >> >>> >> > Hello All,
> >> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >> >>> >> >
> >> >>> >> > I am setting 

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
Yes

On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
 wrote:
> Thanks Cody again.
>
> No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
> the Cassandra table and saveToCassandra  is an action and my data do get
> saved into Cassandra. It is working as expected 99% of the time except that
> when there is an exception, I did not want the offsets to be committed.
>
> By Filtering for unsuccessful attempts, do you mean filtering the bad
> records...
>
>
>
>
>
>
> On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger  wrote:
>>
>> If literally all you are doing is rdd.map I wouldn't expect
>> saveToCassandra to happen at all, since map is not an action.
>>
>> Filtering for unsuccessful attempts and collecting those back to the
>> driver would be one way for the driver to know whether it was safe to
>> commit.
>>
>> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>>  wrote:
>> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
>> > running on executor
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
>> > running on driver.
>> >
>> > Is this the reason why kafka offsets are committed even when an
>> > exception is
>> > raised? If so is there a way to commit the offsets only when there are
>> > no
>> > exceptions?
>> >
>> >
>> >
>> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
>> > 
>> > wrote:
>> >>
>> >> Thanks again Cody,
>> >>
>> >> My understanding is all the code inside foreachRDD is running on the
>> >> driver except for
>> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>> >>
>> >> When the exception is raised, I was thinking I won't be committing the
>> >> offsets, but the offsets are committed all the time independent of
>> >> whether
>> >> an exception was raised or not.
>> >>
>> >> It will be helpful if you can explain this behavior.
>> >>
>> >>
>> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
>> >> wrote:
>> >>>
>> >>> I mean that the kafka consumers running on the executors should not be
>> >>> automatically committing, because the fact that a message was read by
>> >>> the consumer has no bearing on whether it was actually successfully
>> >>> processed after reading.
>> >>>
>> >>> It sounds to me like you're confused about where code is running.
>> >>> foreachRDD runs on the driver, not the executor.
>> >>>
>> >>>
>> >>>
>> >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>> >>>
>> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>> >>>  wrote:
>> >>> > Thanks Cody for your response.
>> >>> >
>> >>> > All I want to do is, commit the offsets only if I am successfully
>> >>> > able
>> >>> > to
>> >>> > write to cassandra database.
>> >>> >
>> >>> > The line //save the rdd to Cassandra database is
>> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >>> >
>> >>> > What do you mean by Executors shouldn't be auto-committing, that's
>> >>> > why
>> >>> > it's
>> >>> > being overridden. It is the executors that do the mapping and saving
>> >>> > to
>> >>> > cassandra. The status of success or failure of this operation is
>> >>> > known
>> >>> > only
>> >>> > on the executor and thats where I want to commit the kafka offsets.
>> >>> > If
>> >>> > this
>> >>> > is not what I sould be doing, then  what is the right way?
>> >>> >
>> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>> >>> > wrote:
>> >>> >>
>> >>> >> If your complaint is about offsets being committed that you didn't
>> >>> >> expect... auto commit being false on executors shouldn't have
>> >>> >> anything
>> >>> >> to do with that.  Executors shouldn't be auto-committing, that's
>> >>> >> why
>> >>> >> it's being overridden.
>> >>> >>
>> >>> >> What you've said and the code you posted isn't really enough to
>> >>> >> explain what your issue is, e.g.
>> >>> >>
>> >>> >> is this line
>> >>> >> // save the rdd to Cassandra database
>> >>> >> a blocking call
>> >>> >>
>> >>> >> are you sure that the rdd foreach isn't being retried and
>> >>> >> succeeding
>> >>> >> the second time around, etc
>> >>> >>
>> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >>> >>  wrote:
>> >>> >> > Hello All,
>> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >>> >> >
>> >>> >> > I am setting enable.auto.commit to false, and manually want to
>> >>> >> > commit
>> >>> >> > the
>> >>> >> > offsets after my output operation is successful. So when a
>> >>> >> > exception
>> >>> >> > is
>> >>> >> > raised during during the processing I do not want the offsets to
>> >>> >> > be
>> >>> >> > committed. But looks like the offsets are automatically committed
>> >>> >> > even
>> >>> >> > when
>> >>> >> > the exception is raised and thereby I am losing data.
>> >>> 

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
Thanks Cody again.

No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
the Cassandra table and saveToCassandra  is an action and my data do get
saved into Cassandra. It is working as expected 99% of the time except that
when there is an exception, I did not want the offsets to be committed.

By Filtering for unsuccessful attempts, do you mean filtering the bad
records...






On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger  wrote:

> If literally all you are doing is rdd.map I wouldn't expect
> saveToCassandra to happen at all, since map is not an action.
>
> Filtering for unsuccessful attempts and collecting those back to the
> driver would be one way for the driver to know whether it was safe to
> commit.
>
> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>  wrote:
> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
> > running on executor
> >
> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
> > running on driver.
> >
> > Is this the reason why kafka offsets are committed even when an
> exception is
> > raised? If so is there a way to commit the offsets only when there are no
> > exceptions?
> >
> >
> >
> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande <
> deshpandesh...@gmail.com>
> > wrote:
> >>
> >> Thanks again Cody,
> >>
> >> My understanding is all the code inside foreachRDD is running on the
> >> driver except for
> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >>
> >> When the exception is raised, I was thinking I won't be committing the
> >> offsets, but the offsets are committed all the time independent of
> whether
> >> an exception was raised or not.
> >>
> >> It will be helpful if you can explain this behavior.
> >>
> >>
> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
> wrote:
> >>>
> >>> I mean that the kafka consumers running on the executors should not be
> >>> automatically committing, because the fact that a message was read by
> >>> the consumer has no bearing on whether it was actually successfully
> >>> processed after reading.
> >>>
> >>> It sounds to me like you're confused about where code is running.
> >>> foreachRDD runs on the driver, not the executor.
> >>>
> >>>
> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >>>
> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >>>  wrote:
> >>> > Thanks Cody for your response.
> >>> >
> >>> > All I want to do is, commit the offsets only if I am successfully
> able
> >>> > to
> >>> > write to cassandra database.
> >>> >
> >>> > The line //save the rdd to Cassandra database is
> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >>> >
> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> why
> >>> > it's
> >>> > being overridden. It is the executors that do the mapping and saving
> to
> >>> > cassandra. The status of success or failure of this operation is
> known
> >>> > only
> >>> > on the executor and thats where I want to commit the kafka offsets.
> If
> >>> > this
> >>> > is not what I sould be doing, then  what is the right way?
> >>> >
> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
> >>> > wrote:
> >>> >>
> >>> >> If your complaint is about offsets being committed that you didn't
> >>> >> expect... auto commit being false on executors shouldn't have
> anything
> >>> >> to do with that.  Executors shouldn't be auto-committing, that's why
> >>> >> it's being overridden.
> >>> >>
> >>> >> What you've said and the code you posted isn't really enough to
> >>> >> explain what your issue is, e.g.
> >>> >>
> >>> >> is this line
> >>> >> // save the rdd to Cassandra database
> >>> >> a blocking call
> >>> >>
> >>> >> are you sure that the rdd foreach isn't being retried and succeeding
> >>> >> the second time around, etc
> >>> >>
> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >>> >>  wrote:
> >>> >> > Hello All,
> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >>> >> >
> >>> >> > I am setting enable.auto.commit to false, and manually want to
> >>> >> > commit
> >>> >> > the
> >>> >> > offsets after my output operation is successful. So when a
> exception
> >>> >> > is
> >>> >> > raised during during the processing I do not want the offsets to
> be
> >>> >> > committed. But looks like the offsets are automatically committed
> >>> >> > even
> >>> >> > when
> >>> >> > the exception is raised and thereby I am losing data.
> >>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false
> for
> >>> >> > executor.  But I don't want it to override. Please help.
> >>> >> >
> >>> >> > My code looks like..
> >>> >> >
> >>> >> > val kafkaParams = Map[String, Object](
> >>> >> >   "bootstrap.servers" -> brokers,
> >>> >> >   

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
If literally all you are doing is rdd.map I wouldn't expect
saveToCassandra to happen at all, since map is not an action.

Filtering for unsuccessful attempts and collecting those back to the
driver would be one way for the driver to know whether it was safe to
commit.

On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
 wrote:
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
> running on executor
>
> stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
> running on driver.
>
> Is this the reason why kafka offsets are committed even when an exception is
> raised? If so is there a way to commit the offsets only when there are no
> exceptions?
>
>
>
> On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande 
> wrote:
>>
>> Thanks again Cody,
>>
>> My understanding is all the code inside foreachRDD is running on the
>> driver except for
>> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>>
>> When the exception is raised, I was thinking I won't be committing the
>> offsets, but the offsets are committed all the time independent of whether
>> an exception was raised or not.
>>
>> It will be helpful if you can explain this behavior.
>>
>>
>> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:
>>>
>>> I mean that the kafka consumers running on the executors should not be
>>> automatically committing, because the fact that a message was read by
>>> the consumer has no bearing on whether it was actually successfully
>>> processed after reading.
>>>
>>> It sounds to me like you're confused about where code is running.
>>> foreachRDD runs on the driver, not the executor.
>>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>
>>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>>>  wrote:
>>> > Thanks Cody for your response.
>>> >
>>> > All I want to do is, commit the offsets only if I am successfully able
>>> > to
>>> > write to cassandra database.
>>> >
>>> > The line //save the rdd to Cassandra database is
>>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>>> >
>>> > What do you mean by Executors shouldn't be auto-committing, that's why
>>> > it's
>>> > being overridden. It is the executors that do the mapping and saving to
>>> > cassandra. The status of success or failure of this operation is known
>>> > only
>>> > on the executor and thats where I want to commit the kafka offsets. If
>>> > this
>>> > is not what I sould be doing, then  what is the right way?
>>> >
>>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>>> > wrote:
>>> >>
>>> >> If your complaint is about offsets being committed that you didn't
>>> >> expect... auto commit being false on executors shouldn't have anything
>>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>>> >> it's being overridden.
>>> >>
>>> >> What you've said and the code you posted isn't really enough to
>>> >> explain what your issue is, e.g.
>>> >>
>>> >> is this line
>>> >> // save the rdd to Cassandra database
>>> >> a blocking call
>>> >>
>>> >> are you sure that the rdd foreach isn't being retried and succeeding
>>> >> the second time around, etc
>>> >>
>>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>>> >>  wrote:
>>> >> > Hello All,
>>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>>> >> >
>>> >> > I am setting enable.auto.commit to false, and manually want to
>>> >> > commit
>>> >> > the
>>> >> > offsets after my output operation is successful. So when a exception
>>> >> > is
>>> >> > raised during during the processing I do not want the offsets to be
>>> >> > committed. But looks like the offsets are automatically committed
>>> >> > even
>>> >> > when
>>> >> > the exception is raised and thereby I am losing data.
>>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>>> >> > executor.  But I don't want it to override. Please help.
>>> >> >
>>> >> > My code looks like..
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> >   "bootstrap.servers" -> brokers,
>>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>>> >> >   "group.id" -> "Group1",
>>> >> >   "auto.offset.reset" -> offsetresetparameter,
>>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>>> >> > )
>>> >> >
>>> >> > val myTopics = Array("topic1")
>>> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
>>> >> >   ssc,
>>> >> >   PreferConsistent,
>>> >> >   Subscribe[String, String](myTopics, kafkaParams)
>>> >> > )
>>> >> >
>>> >> > stream1.foreachRDD { (rdd, time) =>
>>> >> > val offsetRanges =
>>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>> >> > try {
>>> 

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
running on executor

stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
is running on driver.

Is this the reason why kafka offsets are committed even when an
exception is raised? If so is there a way to commit the offsets only
when there are no exceptions?



On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande 
wrote:

> Thanks again Cody,
>
> My understanding is all the code inside foreachRDD is running on the
> driver except for
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>
> When the exception is raised, I was thinking I won't be committing the
> offsets, but the offsets are committed all the time independent of whether
> an exception was raised or not.
>
> It will be helpful if you can explain this behavior.
>
>
> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:
>
>> I mean that the kafka consumers running on the executors should not be
>> automatically committing, because the fact that a message was read by
>> the consumer has no bearing on whether it was actually successfully
>> processed after reading.
>>
>> It sounds to me like you're confused about where code is running.
>> foreachRDD runs on the driver, not the executor.
>>
>> http://spark.apache.org/docs/latest/streaming-programming-gu
>> ide.html#design-patterns-for-using-foreachrdd
>>
>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>>  wrote:
>> > Thanks Cody for your response.
>> >
>> > All I want to do is, commit the offsets only if I am successfully able
>> to
>> > write to cassandra database.
>> >
>> > The line //save the rdd to Cassandra database is
>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >
>> > What do you mean by Executors shouldn't be auto-committing, that's why
>> it's
>> > being overridden. It is the executors that do the mapping and saving to
>> > cassandra. The status of success or failure of this operation is known
>> only
>> > on the executor and thats where I want to commit the kafka offsets. If
>> this
>> > is not what I sould be doing, then  what is the right way?
>> >
>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>> wrote:
>> >>
>> >> If your complaint is about offsets being committed that you didn't
>> >> expect... auto commit being false on executors shouldn't have anything
>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>> >> it's being overridden.
>> >>
>> >> What you've said and the code you posted isn't really enough to
>> >> explain what your issue is, e.g.
>> >>
>> >> is this line
>> >> // save the rdd to Cassandra database
>> >> a blocking call
>> >>
>> >> are you sure that the rdd foreach isn't being retried and succeeding
>> >> the second time around, etc
>> >>
>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >>  wrote:
>> >> > Hello All,
>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >> >
>> >> > I am setting enable.auto.commit to false, and manually want to commit
>> >> > the
>> >> > offsets after my output operation is successful. So when a exception
>> is
>> >> > raised during during the processing I do not want the offsets to be
>> >> > committed. But looks like the offsets are automatically committed
>> even
>> >> > when
>> >> > the exception is raised and thereby I am losing data.
>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> >> > executor.  But I don't want it to override. Please help.
>> >> >
>> >> > My code looks like..
>> >> >
>> >> > val kafkaParams = Map[String, Object](
>> >> >   "bootstrap.servers" -> brokers,
>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>> >> >   "group.id" -> "Group1",
>> >> >   "auto.offset.reset" -> offsetresetparameter,
>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>> >> > )
>> >> >
>> >> > val myTopics = Array("topic1")
>> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
>> >> >   ssc,
>> >> >   PreferConsistent,
>> >> >   Subscribe[String, String](myTopics, kafkaParams)
>> >> > )
>> >> >
>> >> > stream1.foreachRDD { (rdd, time) =>
>> >> > val offsetRanges =
>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >> > try {
>> >> > //save the rdd to Cassandra database
>> >> >
>> >> >
>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> >> > } catch {
>> >> >   case ex: Exception => {
>> >> > println(ex.toString + "!! Bad Data, Unable to persist
>> >> > into
>> >> > table !" + errorOffsetRangesToString(offsetRanges))
>> >> >   }
>> >> > }
>> >> > }
>> >> >
>> >> > ssc.start()
>> >> > ssc.awaitTermination()
>> >
>> >
>>
>
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks again Cody,

My understanding is all the code inside foreachRDD is running on the driver
except for
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").

When the exception is raised, I was thinking I won't be committing the
offsets, but the offsets are committed all the time independent of whether
an exception was raised or not.

It will be helpful if you can explain this behavior.


On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:

> I mean that the kafka consumers running on the executors should not be
> automatically committing, because the fact that a message was read by
> the consumer has no bearing on whether it was actually successfully
> processed after reading.
>
> It sounds to me like you're confused about where code is running.
> foreachRDD runs on the driver, not the executor.
>
> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
>
> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>  wrote:
> > Thanks Cody for your response.
> >
> > All I want to do is, commit the offsets only if I am successfully able to
> > write to cassandra database.
> >
> > The line //save the rdd to Cassandra database is
> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >
> > What do you mean by Executors shouldn't be auto-committing, that's why
> it's
> > being overridden. It is the executors that do the mapping and saving to
> > cassandra. The status of success or failure of this operation is known
> only
> > on the executor and thats where I want to commit the kafka offsets. If
> this
> > is not what I sould be doing, then  what is the right way?
> >
> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
> wrote:
> >>
> >> If your complaint is about offsets being committed that you didn't
> >> expect... auto commit being false on executors shouldn't have anything
> >> to do with that.  Executors shouldn't be auto-committing, that's why
> >> it's being overridden.
> >>
> >> What you've said and the code you posted isn't really enough to
> >> explain what your issue is, e.g.
> >>
> >> is this line
> >> // save the rdd to Cassandra database
> >> a blocking call
> >>
> >> are you sure that the rdd foreach isn't being retried and succeeding
> >> the second time around, etc
> >>
> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >>  wrote:
> >> > Hello All,
> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >> >
> >> > I am setting enable.auto.commit to false, and manually want to commit
> >> > the
> >> > offsets after my output operation is successful. So when a exception
> is
> >> > raised during during the processing I do not want the offsets to be
> >> > committed. But looks like the offsets are automatically committed even
> >> > when
> >> > the exception is raised and thereby I am losing data.
> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> >> > executor.  But I don't want it to override. Please help.
> >> >
> >> > My code looks like..
> >> >
> >> > val kafkaParams = Map[String, Object](
> >> >   "bootstrap.servers" -> brokers,
> >> >   "key.deserializer" -> classOf[StringDeserializer],
> >> >   "value.deserializer" -> classOf[StringDeserializer],
> >> >   "group.id" -> "Group1",
> >> >   "auto.offset.reset" -> offsetresetparameter,
> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
> >> > )
> >> >
> >> > val myTopics = Array("topic1")
> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
> >> >   ssc,
> >> >   PreferConsistent,
> >> >   Subscribe[String, String](myTopics, kafkaParams)
> >> > )
> >> >
> >> > stream1.foreachRDD { (rdd, time) =>
> >> > val offsetRanges =
> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >> > try {
> >> > //save the rdd to Cassandra database
> >> >
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> >> > } catch {
> >> >   case ex: Exception => {
> >> > println(ex.toString + "!! Bad Data, Unable to persist
> >> > into
> >> > table !" + errorOffsetRangesToString(offsetRanges))
> >> >   }
> >> > }
> >> > }
> >> >
> >> > ssc.start()
> >> > ssc.awaitTermination()
> >
> >
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
I mean that the kafka consumers running on the executors should not be
automatically committing, because the fact that a message was read by
the consumer has no bearing on whether it was actually successfully
processed after reading.

It sounds to me like you're confused about where code is running.
foreachRDD runs on the driver, not the executor.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
 wrote:
> Thanks Cody for your response.
>
> All I want to do is, commit the offsets only if I am successfully able to
> write to cassandra database.
>
> The line //save the rdd to Cassandra database is
> rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>
> What do you mean by Executors shouldn't be auto-committing, that's why it's
> being overridden. It is the executors that do the mapping and saving to
> cassandra. The status of success or failure of this operation is known only
> on the executor and thats where I want to commit the kafka offsets. If this
> is not what I sould be doing, then  what is the right way?
>
> On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger  wrote:
>>
>> If your complaint is about offsets being committed that you didn't
>> expect... auto commit being false on executors shouldn't have anything
>> to do with that.  Executors shouldn't be auto-committing, that's why
>> it's being overridden.
>>
>> What you've said and the code you posted isn't really enough to
>> explain what your issue is, e.g.
>>
>> is this line
>> // save the rdd to Cassandra database
>> a blocking call
>>
>> are you sure that the rdd foreach isn't being retried and succeeding
>> the second time around, etc
>>
>> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>>  wrote:
>> > Hello All,
>> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >
>> > I am setting enable.auto.commit to false, and manually want to commit
>> > the
>> > offsets after my output operation is successful. So when a exception is
>> > raised during during the processing I do not want the offsets to be
>> > committed. But looks like the offsets are automatically committed even
>> > when
>> > the exception is raised and thereby I am losing data.
>> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> > executor.  But I don't want it to override. Please help.
>> >
>> > My code looks like..
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> brokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "group.id" -> "Group1",
>> >   "auto.offset.reset" -> offsetresetparameter,
>> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>> > )
>> >
>> > val myTopics = Array("topic1")
>> > val stream1 = KafkaUtils.createDirectStream[String, String](
>> >   ssc,
>> >   PreferConsistent,
>> >   Subscribe[String, String](myTopics, kafkaParams)
>> > )
>> >
>> > stream1.foreachRDD { (rdd, time) =>
>> > val offsetRanges =
>> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> > try {
>> > //save the rdd to Cassandra database
>> >
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> > } catch {
>> >   case ex: Exception => {
>> > println(ex.toString + "!! Bad Data, Unable to persist
>> > into
>> > table !" + errorOffsetRangesToString(offsetRanges))
>> >   }
>> > }
>> > }
>> >
>> > ssc.start()
>> > ssc.awaitTermination()
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread shyla deshpande
Thanks Cody for your response.

All I want to do is, commit the offsets only if I am successfully able to
write to cassandra database.

The line //save the rdd to Cassandra database is
rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")

What do you mean by Executors shouldn't be auto-committing, that's why it's
being overridden. It is the executors that do the mapping and saving to
cassandra. The status of success or failure of this operation is known only
on the executor and thats where I want to commit the kafka offsets. If this
is not what I sould be doing, then  what is the right way?

On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger  wrote:

> If your complaint is about offsets being committed that you didn't
> expect... auto commit being false on executors shouldn't have anything
> to do with that.  Executors shouldn't be auto-committing, that's why
> it's being overridden.
>
> What you've said and the code you posted isn't really enough to
> explain what your issue is, e.g.
>
> is this line
> // save the rdd to Cassandra database
> a blocking call
>
> are you sure that the rdd foreach isn't being retried and succeeding
> the second time around, etc
>
> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>  wrote:
> > Hello All,
> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >
> > I am setting enable.auto.commit to false, and manually want to commit the
> > offsets after my output operation is successful. So when a exception is
> > raised during during the processing I do not want the offsets to be
> > committed. But looks like the offsets are automatically committed even
> when
> > the exception is raised and thereby I am losing data.
> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> > executor.  But I don't want it to override. Please help.
> >
> > My code looks like..
> >
> > val kafkaParams = Map[String, Object](
> >   "bootstrap.servers" -> brokers,
> >   "key.deserializer" -> classOf[StringDeserializer],
> >   "value.deserializer" -> classOf[StringDeserializer],
> >   "group.id" -> "Group1",
> >   "auto.offset.reset" -> offsetresetparameter,
> >   "enable.auto.commit" -> (false: java.lang.Boolean)
> > )
> >
> > val myTopics = Array("topic1")
> > val stream1 = KafkaUtils.createDirectStream[String, String](
> >   ssc,
> >   PreferConsistent,
> >   Subscribe[String, String](myTopics, kafkaParams)
> > )
> >
> > stream1.foreachRDD { (rdd, time) =>
> > val offsetRanges = rdd.asInstanceOf[
> HasOffsetRanges].offsetRanges
> > try {
> > //save the rdd to Cassandra database
> >
> >   stream1.asInstanceOf[CanCommitOffsets].commitAsync(
> offsetRanges)
> > } catch {
> >   case ex: Exception => {
> > println(ex.toString + "!! Bad Data, Unable to persist
> into
> > table !" + errorOffsetRangesToString(offsetRanges))
> >   }
> > }
> > }
> >
> > ssc.start()
> > ssc.awaitTermination()
>


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
If your complaint is about offsets being committed that you didn't
expect... auto commit being false on executors shouldn't have anything
to do with that.  Executors shouldn't be auto-committing, that's why
it's being overridden.

What you've said and the code you posted isn't really enough to
explain what your issue is, e.g.

is this line
// save the rdd to Cassandra database
a blocking call

are you sure that the rdd foreach isn't being retried and succeeding
the second time around, etc

On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
 wrote:
> Hello All,
> I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>
> I am setting enable.auto.commit to false, and manually want to commit the
> offsets after my output operation is successful. So when a exception is
> raised during during the processing I do not want the offsets to be
> committed. But looks like the offsets are automatically committed even when
> the exception is raised and thereby I am losing data.
> In my logs I see,  WARN  overriding enable.auto.commit to false for
> executor.  But I don't want it to override. Please help.
>
> My code looks like..
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "group.id" -> "Group1",
>   "auto.offset.reset" -> offsetresetparameter,
>   "enable.auto.commit" -> (false: java.lang.Boolean)
> )
>
> val myTopics = Array("topic1")
> val stream1 = KafkaUtils.createDirectStream[String, String](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, String](myTopics, kafkaParams)
> )
>
> stream1.foreachRDD { (rdd, time) =>
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> try {
> //save the rdd to Cassandra database
>
>   stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> } catch {
>   case ex: Exception => {
> println(ex.toString + "!! Bad Data, Unable to persist into
> table !" + errorOffsetRangesToString(offsetRanges))
>   }
> }
> }
>
> ssc.start()
> ssc.awaitTermination()

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-05 Thread shyla deshpande
Hello All,
I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .

I am setting enable.auto.commit to false, and manually want to commit
the offsets after my output operation is successful. So when a
exception is raised during during the processing I do not want the
offsets to be committed. But looks like the offsets are automatically
committed even when the exception is raised and thereby I am losing
data.
In my logs I see,  WARN  overriding enable.auto.commit to false for
executor.  But I don't want it to override. Please help.

My code looks like..

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "Group1",
  "auto.offset.reset" -> offsetresetparameter,
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val myTopics = Array("topic1")
val stream1 = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](myTopics, kafkaParams)
)

stream1.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
try {
//save the rdd to Cassandra database

  stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
} catch {
  case ex: Exception => {
println(ex.toString + "!! Bad Data, Unable to persist
into table !" + errorOffsetRangesToString(offsetRanges))
  }
}
}

ssc.start()
ssc.awaitTermination()