I'm running Spark Streaming using Kafka Direct stream, expecting
exactly-once semantics using checkpoints (which are stored onto HDFS).
My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
each) and stores every row to ElasticSearch using ES-Spark integration.

This job was working without issues on a different environment, but on this
new environment, I've started to see these assertion errors:

"Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
java.lang.AssertionError: assertion failed: Beginning offset 20 is after
the ending offset 14 for topic some-topic partition 1. You either provided
an invalid fromOffset, or the Kafka topic has been damaged"

and also

"Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
messages before reaching ending offset 20 for topic some-topic partition 1
start 14. This should not happen, and indicates that messages may have been
lost"

When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic, 3
zookeeper nodes, no other consumers), I see what appaears to differ from
Spark offset info:

*running kafka.tools.GetOffsetShell --time -1*
some-topic:0:20
some-topic:1:20
some-topic:2:19
some-topic:3:20
*running kafka.tools.GetOffsetShell --time -2*
some-topic:0:0
some-topic:1:0
some-topic:2:0
some-topic:3:0

*running kafka-simple-consumer-shell* I can see all stored messages until
offset 20, with a final output: "Terminating. Reached the end of partition
(some-topic, 1) at offset 20"

I tried removing the whole checkpoint dir and start over, but it keeps
failing.

It looks like these tasks get retried without end. On the spark-ui
streaming tab I see the "Active batches" increase with a confusing "Input
size" value of "-19" (negative size?)

Any pointers will help
Thanks

Roman

Reply via email to