Thanks a lot Bill for looking in to this. I would definitely attempt the
suggestions and let you know the outcome. I have gone through KIP-91, but
struggling to understand the behavior. So does it mean that these errors
are happening due to a failure in the broker? If so why would it kill all
the threads which causes the consumers to be unavailable? Also is there
anyway to handle these errors gracefully so that it does not kill the
threads and hence the consumers?

Thanks,
Tony

On Fri, Feb 23, 2018 at 1:15 AM, Bill Bejeck <b...@confluent.io> wrote:

> Hi Tony,
>
> Looks like you have a known issue that KIP-91(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 91+Provide+Intuitive+User+Timeouts+in+The+Producer)
> will address.
>
> In the meantime, as a workaround, you could try
> setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?)
> other secondary configurations to consider changing would be increasing "
> max.block.ms" and "retries"
>
> Thanks,
> Bill
>
> On Thu, Feb 22, 2018 at 8:14 AM, Tony John <tonyjohnant...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I am running into an issue with my Kafka Streams application. The
> > application was running fine for almost 2 weeks, then it started throwing
> > the below exception which caused the threads to die. Now when I restart
> the
> > application, it dies quickly (1-2 hrs) when trying to catch up the lag.
> >
> > The application is running on an AWS EC2 instance with 8 core processor
> and
> > 16GB of memory. The streams config is given below and more logs are
> > available below (I have stripped of some logs which I though may not be
> > relevant). Towards the end of this thread you will be able to see lot of
> > exceptions similar to the below one + RocksDBExceptions
> > (org.apache.kafka.streams.errors.ProcessorStateException: Error opening
> > store vstore at location
> > /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please
> take
> > a look at it and let me know what could be wrong here?
> >
> > INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
> > standby tasks [4_0, 1_30] in 0ms
> > ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
> > Engine2-StreamThread-6-producer]
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
> > [1_34] Error sending record to topic cv-v1-cv. No more offsets will be
> > recorded for this task and the exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s)
> for
> > cv-v1-cv-2: 31439 ms has passed since last append
> > DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-3] processing latency 46282 > commit time 30000 for
> > 10000 records. Adjusting down recordsProcessedBeforeCommit=6482
> > ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
> > following error:
> > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:145)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:296)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:513)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:482)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> > record(s) for cv-v1-cv-2: 31439 ms has passed since last append
> > INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-6] Shutting down
> > INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to
> > PENDING_SHUTDOWN.
> > DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34,
> 7_15],
> > standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and
> > suspended standby tasks [1_3, 1_30]
> > INFO  2018-02-21 08:37:24.879 [Engine2-StreamThread-3]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and
> > standby tasks [1_0, 1_6, 1_13] in 0ms
> > INFO  2018-02-21 08:37:24.954 [Engine2-StreamThread-6]
> > org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> > INFO  2018-02-21 08:37:24.957 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-6] Stream thread shutdown complete
> > INFO  2018-02-21 08:37:24.957 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-6] State transition from PENDING_SHUTDOWN to DEAD.
> >
> >
> > *Streams Config*
> > *----------------------*
> > INFO  2018-02-21 07:11:20.209 [main] org.apache.kafka.streams.
> > StreamsConfig
> > - StreamsConfig values:
> > application.id = cv-v1
> > application.server =
> > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
> > buffered.records.per.partition = 1000
> > cache.max.bytes.buffering = 104857600
> > client.id = Engine2
> > commit.interval.ms = 30000
> > connections.max.idle.ms = 540000
> > default.key.serde = class org.apache.kafka.common.serialization.Serdes$
> > ByteArraySerde
> > default.timestamp.extractor = class org.apache.kafka.streams.processor.
> > FailOnInvalidTimestamp
> > default.value.serde = class org.apache.kafka.common.
> serialization.Serdes$
> > ByteArraySerde
> > key.serde = null
> > metadata.max.age.ms = 300000
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.recording.level = DEBUG
> > metrics.sample.window.ms = 30000
> > num.standby.replicas = 1
> > num.stream.threads = 1
> > partition.grouper = class org.apache.kafka.streams.processor.
> > DefaultPartitionGrouper
> > poll.ms = 100
> > processing.guarantee = at_least_once
> > receive.buffer.bytes = 32768
> > reconnect.backoff.max.ms = 1000
> > reconnect.backoff.ms = 50
> > replication.factor = 2
> > request.timeout.ms = 40000
> > retry.backoff.ms = 100
> > rocksdb.config.setter = null
> > security.protocol = PLAINTEXT
> > send.buffer.bytes = 131072
> > state.cleanup.delay.ms = 600000
> > state.dir = /mnt/store/kafka-streams
> > timestamp.extractor = null
> > value.serde = null
> > windowstore.changelog.additional.retention.ms = 86400000
> > zookeeper.connect =
> >
> >
> > INFO  2018-02-21 07:11:20.398 [main] org.apache.kafka.streams.
> > StreamsConfig
> > - StreamsConfig values:
> > application.id = pe-v1
> > application.server = newengine101:8080
> > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
> > buffered.records.per.partition = 1000
> > cache.max.bytes.buffering = 2147483648
> > client.id = Engine2
> > commit.interval.ms = 30000
> > connections.max.idle.ms = 540000
> > default.key.serde = class org.apache.kafka.common.serialization.Serdes$
> > ByteArraySerde
> > default.timestamp.extractor = class org.apache.kafka.streams.processor.
> > FailOnInvalidTimestamp
> > default.value.serde = class org.apache.kafka.common.
> serialization.Serdes$
> > ByteArraySerde
> > key.serde = null
> > metadata.max.age.ms = 300000
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.recording.level = DEBUG
> > metrics.sample.window.ms = 30000
> > num.standby.replicas = 1
> > num.stream.threads = 6
> > partition.grouper = class org.apache.kafka.streams.processor.
> > DefaultPartitionGrouper
> > poll.ms = 100
> > processing.guarantee = at_least_once
> > receive.buffer.bytes = 32768
> > reconnect.backoff.max.ms = 1000
> > reconnect.backoff.ms = 50
> > replication.factor = 2
> > request.timeout.ms = 40000
> > retry.backoff.ms = 100
> > rocksdb.config.setter = null
> > security.protocol = PLAINTEXT
> > send.buffer.bytes = 131072
> > state.cleanup.delay.ms = 600000
> > state.dir = /mnt/store/kafka-streams
> > timestamp.extractor = null
> > value.serde = null
> > windowstore.changelog.additional.retention.ms = 86400000
> > zookeeper.connect =
> >
> > *More Logs*
> >
> > INFO  2018-02-21 13:33:19.083 [Engine2-StreamThread-1]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-1] Committed all active tasks [0_1, 1_0, 0_3, 2_2]
> > and standby tasks [] in 0ms
> > ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread |
> > Engine2-StreamThread-2-producer] org.apache.kafka.streams.
> > processor.internals.RecordCollectorImpl - task [1_34] Error sending
> record
> > to topic cv-v1-cv. No more offsets will be recorded for this task and the
> > exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s)
> for
> > cv-v1-cv-2: 31903 ms has passed since last append
> > ERROR 2018-02-21 13:33:34.140 [kafka-producer-network-thread |
> > Engine2-StreamThread-2-producer] org.apache.kafka.streams.
> > processor.internals.RecordCollectorImpl - task [1_10] Error sending
> record
> > to topic cv-v1-cv. No more offsets will be recorded for this task and the
> > exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s)
> for
> > cv-v1-cv-2: 31903 ms has passed since last append
> > ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread |
> > Engine2-StreamThread-4-producer] org.apache.kafka.streams.
> > processor.internals.RecordCollectorImpl - task [1_29] Error sending
> record
> > to topic cv-v1-cv. No more offsets will be recorded for this task and the
> > exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s)
> for
> > cv-v1-cv-2: 34111 ms has passed since last append
> > ERROR 2018-02-21 13:33:36.396 [kafka-producer-network-thread |
> > Engine2-StreamThread-4-producer] org.apache.kafka.streams.
> > processor.internals.RecordCollectorImpl - task [1_18] Error sending
> record
> > to topic cv-v1-cv. No more offsets will be recorded for this task and the
> > exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s)
> for
> > cv-v1-cv-2: 33431 ms has passed since last append
> > ERROR 2018-02-21 13:33:36.397 [kafka-producer-network-thread |
> > Engine2-StreamThread-4-producer] org.apache.kafka.streams.
> > processor.internals.RecordCollectorImpl - task [1_32] Error sending
> record
> > to topic cv-v1-cv. No more offsets will be recorded for this task and the
> > exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 73 record(s)
> for
> > cv-v1-cv-2: 32717 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.770 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-4] Failed to commit stream task 1_32 due to the
> > following error:
> > org.apache.kafka.streams.errors.StreamsException: task [1_32] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:145)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:296)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:513)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:482)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 32717 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-4] Failed to commit stream task 1_18 due to the
> > following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_18]
> > Failed
> > to flush state store subs
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(
> > ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
> > AbstractTask.java:194)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:295)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:513)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:482)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18]
> > exception caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(
> > ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?]
> > ... 13 more
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> > record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.774 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-4] Failed to commit stream task 1_29 due to the
> > following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_29]
> > Failed
> > to flush state store subs
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(
> > ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
> > AbstractTask.java:194)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:295)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:513)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:482)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29]
> > exception caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(
> > ProcessorStateManager.java:255) ~[kafka-streams-0.11.0.2.jar:?]
> > ... 13 more
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> > INFO  2018-02-21 13:33:40.775 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] Shutting down
> > INFO  2018-02-21 13:33:40.775 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] State transition from RUNNING to
> PENDING_SHUTDOWN.
> > DEBUG 2018-02-21 13:33:40.775 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] Shutting down all active tasks [1_32, 1_18,
> 7_15,
> > 1_24, 6_5, 6_7, 1_29], standby tasks [1_15, 1_3, 1_19, 1_8], suspended
> > tasks [6_12, 1_34, 6_14, 1_22, 1_10, 1_28], and suspended standby tasks
> > [1_32, 1_5, 1_23, 1_8]
> > ERROR 2018-02-21 13:33:40.876 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager -
> task
> > [1_18] Failed to close state store vstore:
> > org.apache.kafka.streams.errors.StreamsException: task [1_18] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> > CachingKeyValueStore.java:113)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.
> > closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> StreamTask.closeSuspended(
> > StreamTask.java:410)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.close(StreamTask.java:479)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > shutdownTasksAndState(StreamThread.java:1009)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> > StreamThread.java:965)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> > record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamTask - task [1_18]
> > Could
> > not close state manager due to the following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_18]
> > Failed
> > to close state store vstore
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.
> > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> StreamTask.closeSuspended(
> > StreamTask.java:410)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.close(StreamTask.java:479)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > shutdownTasksAndState(StreamThread.java:1009)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> > StreamThread.java:965)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18]
> > exception caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> > CachingKeyValueStore.java:113)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> > ... 6 more
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> > record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.877 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] Failed while closing StreamTask 1_18 due to the
> > following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_18]
> > Failed
> > to close state store vstore
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.
> > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> StreamTask.closeSuspended(
> > StreamTask.java:410)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.close(StreamTask.java:479)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > shutdownTasksAndState(StreamThread.java:1009)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> > StreamThread.java:965)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_18]
> > exception caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> > CachingKeyValueStore.java:113)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> > ... 6 more
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
> > record(s) for cv-v1-cv-2: 33431 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager -
> task
> > [1_29] Failed to close state store vstore:
> > org.apache.kafka.streams.errors.StreamsException: task [1_29] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> > CachingKeyValueStore.java:113)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:278) [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.
> > closeStateManager(AbstractTask.java:232) [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> StreamTask.closeSuspended(
> > StreamTask.java:410)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.close(StreamTask.java:479)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > shutdownTasksAndState(StreamThread.java:1009)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> > StreamThread.java:965)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamTask - task [1_29]
> > Could
> > not close state manager due to the following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_29]
> > Failed
> > to close state store vstore
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.
> > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> StreamTask.closeSuspended(
> > StreamTask.java:410)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.close(StreamTask.java:479)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > shutdownTasksAndState(StreamThread.java:1009)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> > StreamThread.java:965)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29]
> > exception caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> > CachingKeyValueStore.java:113)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> > ... 6 more
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> > ERROR 2018-02-21 13:33:40.986 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] Failed while closing StreamTask 1_29 due to the
> > following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_29]
> > Failed
> > to close state store vstore
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:281) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.
> > closeStateManager(AbstractTask.java:232) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> StreamTask.closeSuspended(
> > StreamTask.java:410)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.close(StreamTask.java:479)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > shutdownTasksAndState(StreamThread.java:1009)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
> > StreamThread.java:965)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:470) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_29]
> > exception caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > RecordCollectorImpl.java:87)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> > StoreChangeLogger.java:59)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:58)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> ChangeLoggingKeyValueBytesStor
> > e.put(ChangeLoggingKeyValueBytesStore.java:29)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$3.
> > execute(MeteredKeyValueStore.java:136) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.
> > measureLatency(MeteredKeyValueStore.java:218)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(
> > MeteredKeyValueStore.java:133)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > putAndMaybeForward(CachingKeyValueStore.java:95)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.
> CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:78)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:141)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:99)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.ThreadCache.
> > flush(ThreadCache.java:129)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(
> > CachingKeyValueStore.java:107)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(
> > CachingKeyValueStore.java:113)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.close(
> > ProcessorStateManager.java:278) ~[kafka-streams-0.11.0.2.jar:?]
> > ... 6 more
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 34111 ms has passed since last append
> > INFO  2018-02-21 13:33:41.052 [Engine2-StreamThread-4]
> > org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka
> > producer with timeoutMillis = 9223372036854775807 ms.
> > INFO  2018-02-21 13:33:41.054 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] Stream thread shutdown complete
> > INFO  2018-02-21 13:33:41.054 [Engine2-StreamThread-4]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-4] State transition from PENDING_SHUTDOWN to DEAD.
> > org.apache.kafka.streams.errors.StreamsException: task [1_32] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:145)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:296)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:513)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:482)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:459) ~[kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 32717 ms has passed since last append
> > ERROR 2018-02-21 13:33:42.590 [Engine2-StreamThread-2]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-2] Failed to commit stream task 1_34 due to the
> > following error:
> > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:145)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:296)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > applyToRunningTasks(AssignedTasks.java:420)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.
> > punctuateAndCommit(AssignedTasks.java:357)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndPunctuate(StreamThread.java:662)
> [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:513)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:482)
> > [kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 73
> > record(s) for cv-v1-cv-2: 31903 ms has passed since last append
> > ERROR 2018-02-21 13:33:42.591 [Engine2-StreamThread-2]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-2] Failed to commit stream task 1_10 due to the
> > following error:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [1_10]
> > Failed
> > to flush state store subs
> > at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.flush(
> > ProcessorStateManager.java:257) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(
> > AbstractTask.java:194)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:295)
> > ~[kafka-streams-0.11.0.2.jar:?]
> >
> > Thanks,
> > Tony
> >
>

Reply via email to