rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
running on executor

stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
is running on driver.

Is this the reason why kafka offsets are committed even when an
exception is raised? If so is there a way to commit the offsets only
when there are no exceptions?



On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Thanks again Cody,
>
> My understanding is all the code inside foreachRDD is running on the
> driver except for
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>
> When the exception is raised, I was thinking I won't be committing the
> offsets, but the offsets are committed all the time independent of whether
> an exception was raised or not.
>
> It will be helpful if you can explain this behavior.
>
>
> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <c...@koeninger.org> wrote:
>
>> I mean that the kafka consumers running on the executors should not be
>> automatically committing, because the fact that a message was read by
>> the consumer has no bearing on whether it was actually successfully
>> processed after reading.
>>
>> It sounds to me like you're confused about where code is running.
>> foreachRDD runs on the driver, not the executor.
>>
>> http://spark.apache.org/docs/latest/streaming-programming-gu
>> ide.html#design-patterns-for-using-foreachrdd
>>
>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>> <deshpandesh...@gmail.com> wrote:
>> > Thanks Cody for your response.
>> >
>> > All I want to do is, commit the offsets only if I am successfully able
>> to
>> > write to cassandra database.
>> >
>> > The line //save the rdd to Cassandra database is
>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >
>> > What do you mean by Executors shouldn't be auto-committing, that's why
>> it's
>> > being overridden. It is the executors that do the mapping and saving to
>> > cassandra. The status of success or failure of this operation is known
>> only
>> > on the executor and thats where I want to commit the kafka offsets. If
>> this
>> > is not what I sould be doing, then  what is the right way?
>> >
>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >> If your complaint is about offsets being committed that you didn't
>> >> expect... auto commit being false on executors shouldn't have anything
>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>> >> it's being overridden.
>> >>
>> >> What you've said and the code you posted isn't really enough to
>> >> explain what your issue is, e.g.
>> >>
>> >> is this line
>> >> // save the rdd to Cassandra database
>> >> a blocking call
>> >>
>> >> are you sure that the rdd foreach isn't being retried and succeeding
>> >> the second time around, etc
>> >>
>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >> <deshpandesh...@gmail.com> wrote:
>> >> > Hello All,
>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >> >
>> >> > I am setting enable.auto.commit to false, and manually want to commit
>> >> > the
>> >> > offsets after my output operation is successful. So when a exception
>> is
>> >> > raised during during the processing I do not want the offsets to be
>> >> > committed. But looks like the offsets are automatically committed
>> even
>> >> > when
>> >> > the exception is raised and thereby I am losing data.
>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> >> > executor.  But I don't want it to override. Please help.
>> >> >
>> >> > My code looks like..
>> >> >
>> >> >     val kafkaParams = Map[String, Object](
>> >> >       "bootstrap.servers" -> brokers,
>> >> >       "key.deserializer" -> classOf[StringDeserializer],
>> >> >       "value.deserializer" -> classOf[StringDeserializer],
>> >> >       "group.id" -> "Group1",
>> >> >       "auto.offset.reset" -> offsetresetparameter,
>> >> >       "enable.auto.commit" -> (false: java.lang.Boolean)
>> >> >     )
>> >> >
>> >> >     val myTopics = Array("topic1")
>> >> >     val stream1 = KafkaUtils.createDirectStream[String, String](
>> >> >       ssc,
>> >> >       PreferConsistent,
>> >> >       Subscribe[String, String](myTopics, kafkaParams)
>> >> >     )
>> >> >
>> >> >     stream1.foreachRDD { (rdd, time) =>
>> >> >         val offsetRanges =
>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> >> >         try {
>> >> >             //save the rdd to Cassandra database
>> >> >
>> >> >
>> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> >> >         } catch {
>> >> >           case ex: Exception => {
>> >> >             println(ex.toString + "!!!!!! Bad Data, Unable to persist
>> >> > into
>> >> > table !!!!!" + errorOffsetRangesToString(offsetRanges))
>> >> >           }
>> >> >         }
>> >> >     }
>> >> >
>> >> >     ssc.start()
>> >> >     ssc.awaitTermination()
>> >
>> >
>>
>
>

Reply via email to