Thanks Cody for your response.

All I want to do is, commit the offsets only if I am successfully able to
write to cassandra database.

The line //save the rdd to Cassandra database is
rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")

What do you mean by Executors shouldn't be auto-committing, that's why it's
being overridden. It is the executors that do the mapping and saving to
cassandra. The status of success or failure of this operation is known only
on the executor and thats where I want to commit the kafka offsets. If this
is not what I sould be doing, then  what is the right way?

On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <c...@koeninger.org> wrote:

> If your complaint is about offsets being committed that you didn't
> expect... auto commit being false on executors shouldn't have anything
> to do with that.  Executors shouldn't be auto-committing, that's why
> it's being overridden.
>
> What you've said and the code you posted isn't really enough to
> explain what your issue is, e.g.
>
> is this line
> // save the rdd to Cassandra database
> a blocking call
>
> are you sure that the rdd foreach isn't being retried and succeeding
> the second time around, etc
>
> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> <deshpandesh...@gmail.com> wrote:
> > Hello All,
> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >
> > I am setting enable.auto.commit to false, and manually want to commit the
> > offsets after my output operation is successful. So when a exception is
> > raised during during the processing I do not want the offsets to be
> > committed. But looks like the offsets are automatically committed even
> when
> > the exception is raised and thereby I am losing data.
> > In my logs I see,  WARN  overriding enable.auto.commit to false for
> > executor.  But I don't want it to override. Please help.
> >
> > My code looks like..
> >
> >     val kafkaParams = Map[String, Object](
> >       "bootstrap.servers" -> brokers,
> >       "key.deserializer" -> classOf[StringDeserializer],
> >       "value.deserializer" -> classOf[StringDeserializer],
> >       "group.id" -> "Group1",
> >       "auto.offset.reset" -> offsetresetparameter,
> >       "enable.auto.commit" -> (false: java.lang.Boolean)
> >     )
> >
> >     val myTopics = Array("topic1")
> >     val stream1 = KafkaUtils.createDirectStream[String, String](
> >       ssc,
> >       PreferConsistent,
> >       Subscribe[String, String](myTopics, kafkaParams)
> >     )
> >
> >     stream1.foreachRDD { (rdd, time) =>
> >         val offsetRanges = rdd.asInstanceOf[
> HasOffsetRanges].offsetRanges
> >         try {
> >             //save the rdd to Cassandra database
> >
> >           stream1.asInstanceOf[CanCommitOffsets].commitAsync(
> offsetRanges)
> >         } catch {
> >           case ex: Exception => {
> >             println(ex.toString + "!!!!!! Bad Data, Unable to persist
> into
> > table !!!!!" + errorOffsetRangesToString(offsetRanges))
> >           }
> >         }
> >     }
> >
> >     ssc.start()
> >     ssc.awaitTermination()
>

Reply via email to