I am doing that already for all known messy data. Thanks Cody for all your
time and input
On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger wrote:
> Yes
>
> On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
> wrote:
> > Thanks Cody again.
> >
> > No. I am doing mapping of the Kafka ConsumerRecord
Yes
On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
wrote:
> Thanks Cody again.
>
> No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
> the Cassandra table and saveToCassandra is an action and my data do get
> saved into Cassandra. It is working as expected 99% of the
Thanks Cody again.
No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
the Cassandra table and saveToCassandra is an action and my data do get
saved into Cassandra. It is working as expected 99% of the time except that
when there is an exception, I did not want the offsets
If literally all you are doing is rdd.map I wouldn't expect
saveToCassandra to happen at all, since map is not an action.
Filtering for unsuccessful attempts and collecting those back to the
driver would be one way for the driver to know whether it was safe to
commit.
On Mon, Aug 7, 2017 at 12:31
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1") --> is
running on executor
stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
is running on driver.
Is this the reason why kafka offsets are committed even when an
exception is raised? If so is there a way to commit
Thanks again Cody,
My understanding is all the code inside foreachRDD is running on the driver
except for
rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
When the exception is raised, I was thinking I won't be committing the
offsets, but the offsets are committed all the time inde
I mean that the kafka consumers running on the executors should not be
automatically committing, because the fact that a message was read by
the consumer has no bearing on whether it was actually successfully
processed after reading.
It sounds to me like you're confused about where code is running
Thanks Cody for your response.
All I want to do is, commit the offsets only if I am successfully able to
write to cassandra database.
The line //save the rdd to Cassandra database is
rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
What do you mean by Executors shouldn't be auto-co
If your complaint is about offsets being committed that you didn't
expect... auto commit being false on executors shouldn't have anything
to do with that. Executors shouldn't be auto-committing, that's why
it's being overridden.
What you've said and the code you posted isn't really enough to
expl