Github user zsxwing commented on the issue:
https://github.com/apache/spark/pull/15102
Right now since there are some arguments about how to handle various
failures, I'm listing what I found via stress test to discuss:
(1) Kafka APIs fail because we cannot connect to Kafka cluster. Solution:
fail the query.
(2) `getOffset` fails temporarily because some of topics are deleted at the
same time. `consumer.position` will throw NPE due to this race condition.
Solution: retry (This is why `withRetries` is added)
```
consumer.poll(0)
val partitions = consumer.assignment()
consumer.position(p)
```
(3) In `getBatch`, some partitions are new because they are not in
`fromOffsets`. Then we will call `fetchNewPartitionEarliestOffsets` to fetch
these partitions. However, some of these new partitions may be deleted due to
topic deletion, then they won't appear in `consumer.assignment()`. Solution:
log a warning.
(4) In `getBatch`, some partitions are new because they are not in
`fromOffsets`. Then we will call `fetchNewPartitionEarliestOffsets` to fetch
these partitions. Similiar to (2), `consumer.position` may throw NPE due to
this race condition. Solution : retry.
(5) Topics are deleted when a Spark job is runinng, which may cause
`OffsetOutOfRangeException`. (I'm not sure if there are more types of
exceptions, may need to investigate) Solution: log a warning.
(6) A topic is deleted then added. This may make untilOffset is less than
fromOffset. Solution: log a warning.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]