[
https://issues.apache.org/jira/browse/KAFKA-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985429#comment-16985429
]
John Roesler commented on KAFKA-9231:
-------------------------------------
In addition to the error I listed above (ProducerFencedException), I have also
observed the following recoverable exceptions leading to threads dying:
UnknownProducerIdException:
{noformat}
org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending
since an error caught with a previous record (timestamp 1574960233670) to topic
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog due to
org.apache.kafka.common.errors.UnknownProducerIdException: This exception is
raised by the broker if it could not locate the producer metadata associated
with the producerId in question. This could happen if, for instance, the
producer's records were deleted because their retention time had elapsed. Once
the last records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will return this
exception.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:143)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:51)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:202)
at
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716)
at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
at
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:562)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:554)
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This
exception is raised by the broker if it could not locate the producer metadata
associated with the producerId in question. This could happen if, for instance,
the producer's records were deleted because their retention time had elapsed.
Once the last records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will return this
exception.
{noformat}
and an internal assertion:
{noformat}
java.lang.IllegalStateException: RocksDB metrics recorder for store
"KSTREAM-AGGREGATE-STATE-STORE-0000000049" of task 3_1 has already been added.
This is a bug in Kafka Streams.
at
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)
at
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:93)
at
org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:205)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:191)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:227)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:203)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:201)
at
org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:211)
at
org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:323)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:76)
at
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[2019-11-29 08:27:08,883] INFO
[stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2]
stream-thread
[stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] State
transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}
This is a specific internal assertion. What happened there is that the
RocksDBStore initializer registers itself with the metric collector before
fully creating the store. The store creation failed, but it was still
registered, leading to a situation where the store could no longer be created
at all.
My testing setup is to run a Streams application processing data at a modest
rate (~ 20K records/sec) in a three-instance configuration, each of which have
three StreamsThreads. I'm introducing network partitions separating each
instance one at a time from the brokers, at an incidence of about one network
partition per hour, each lasting up to 5 minutes.
I've observed all of these exceptions to kill Streams threads within a few
occurrences of network partitioning. Note that Streams is configured with
appropriate timeouts/retries to tolerate network interruptions lasting longer
than 5 mintues, so any thread deaths are unexpected. All the thread deaths I've
observed are the results of bugs in the Streams exception handling code.
There are two main categories of exception:
1. ProducerFencedException and UnknownProducerIdException . While they reflect
different root causes, both of these are expected with EOS enabled, if the
producer is silent for too long and ceases to be considered a valid member by
the broker. Streams is supposed to handle this situation by rejoining the group
(which includes discarding and re-creating its Producers). These were uncovered
by repeated rebalances ultimately caused by the injected network partitions.
2. IllegalStateException. A specific internal assertion revealed buggy store
initialization logic. This was also uncovered by repeated rebalances ultimately
caused by the injected network partitions.
I have addressed all of these bugs in my PR
https://github.com/apache/kafka/pull/7748
I'm proposing to consider this a Blocker for the 2.4.0 release. It is both
severe (the exceptions above have reliably caused my Streams cluster to die
completely within about 12-24 hours), and it is a regression.
To determine the latter claim, I ran the exact same workload with the exact
same scenario using Kafka Streams 2.3. It should be noted that I still observed
threads to die on that branch, but *only* due to the UnknownProducerId
exception. So, there is some overlap with what I'm seeing on 2.4, but aside
from that one cause, the fact that Streams is losing threads at a high rate
from both ProducerFencedExceptions and its own internal assertion
(IllegalStateException) leads me to think that Streams would be less stable in
production using EOS on 2.4 than it was on 2.3.
For completeness, note that I've run the same test on 2.4 and 2.3 _without_
EOS, and Streams is quite stable. Also note, that the exceptions killing my
applications seem to be directly caused by frequent rebalances and network
interruptions. For users running EOS Streams in reliable and stable conditions,
I do not expect them to suffer thread deaths.
I know that everyone is waiting for the much-delayed 2.4.0 release, so I'm not
taking a hard stance on it, but from where I'm sitting, the situation seems to
warrant a new RC once the fix is prepared. Also note, a new RC was just
announced this morning, so assuming I can merge my PR on Monday, we're only
setting the release back a couple of extra days.
> Streams Threads may die from recoverable errors with EOS enabled
> ----------------------------------------------------------------
>
> Key: KAFKA-9231
> URL: https://issues.apache.org/jira/browse/KAFKA-9231
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.4.0
> Reporter: John Roesler
> Assignee: John Roesler
> Priority: Major
>
> While testing Streams in EOS mode under frequent and heavy network
> partitions, I've encountered the following error, leading to thread death:
> {noformat}
> [2019-11-26 04:54:02,650] ERROR
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> stream-thread
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> Encountered the following unexpected Kafka exception during processing, this
> usually indicate Streams internal errors:
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Failed
> to rebalance.
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] failed
> to suspend stream tasks
> at
> org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253)
> at
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710)
> ... 1 more
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task
> [1_1] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:175)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:581)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:535)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:660)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:628)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:145)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246)
> ... 7 more
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer
> attempted an operation with an old epoch. Either there is a newer producer
> with the same transactionalId, or the producer's transaction has been expired
> by the broker.
> [2019-11-26 04:54:02,650] INFO
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> stream-thread
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State
> transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2019-11-26 04:54:02,650] INFO
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> stream-thread
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)
> [2019-11-26 04:54:02,650] INFO
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> [Consumer
> clientId=stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2-restore-consumer,
> groupId=null] Unsubscribed all topics or patterns and assigned partitions
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2019-11-26 04:54:02,653] INFO
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2]
> stream-thread
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State
> transition from PENDING_SHUTDOWN to DEAD
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {noformat}
> Elsewhere in the code, we catch ProducerFencedExceptions and trigger a
> rebalance instead of killing the thread. It seems like one possible avenue
> has slipped through the cracks.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)