[
https://issues.apache.org/jira/browse/KAFKA-19960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18045167#comment-18045167
]
Mikael Carlstedt commented on KAFKA-19960:
------------------------------------------
Please find attached [^debug.log] for standby task 1_1 of the streams prior
from start of shutdown until it got this error:
{noformat}
2025-12-15 11:38:09,130 ERROR
[messages.nobill.campaigns.delay-queue-9f0ebd13-5ebe-46a9-9904-7ebd9814763e-StreamThread-1]
s.s.n.k.k.i.KafkaClient [KafkaClient.java:179] Unhandled exception in Kafka
streams application messages.nobill.campaigns.delay-queue
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store
rate-limiterpriority-chronological-store-0 at location
/Users/mikcar/git/nobill/sinchcampaigns/target/tmp/node2/messages.nobill.campaigns.delay-queue/1_1/rocksdb/rate-limiterpriority-chronological-store-0
at
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:316)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:278)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:241)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:162)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:63)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:46)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:63)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$2(MeteredKeyValueStore.java:135)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:927)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:135)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:233)
at
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:112)
at
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1016)
at
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:1001)
at
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:918)
at
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1433)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1239)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:926)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:886)
Caused by: org.rocksdb.RocksDBException: lock hold by current process, acquire
time 1765795072 acquiring thread 12924661760:
/Users/mikcar/git/nobill/sinchcampaigns/target/tmp/node2/messages.nobill.campaigns.delay-queue/1_1/rocksdb/rate-limiterpriority-chronological-store-0/LOCK:
No locks available
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:307)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:310)
... 19 common frames omitted{noformat}
This is the streams config:
{noformat}
acceptable.recovery.lag = 10000
application.id = messages.nobill.campaigns.delay-queue
application.server =
bootstrap.servers = [localhost:33029]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.client.supplier = class
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
dsl.store.suppliers.class = class
org.apache.kafka.streams.state.BuiltInDslStoreSuppliers$RocksDBDslStoreSuppliers
enable.metrics.push = true
ensure.explicit.internal.resource.naming = false
group.protocol = classic
log.summary.interval.ms = 120000
max.task.idle.ms = 0
max.warmup.replicas = 2
metadata.max.age.ms = 300000
metadata.recovery.rebootstrap.trigger.ms = 300000
metadata.recovery.strategy = rebootstrap
metric.reporters = [org.apache.kafka.common.metrics.JmxReporter]
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 1
num.stream.threads = 1
poll.ms = 100
probing.rebalance.interval.ms = 600000
processing.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler
processing.guarantee = at_least_once
processor.wrapper.class = class
org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper
production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
rack.aware.assignment.non_overlap_cost = null
rack.aware.assignment.strategy = none
rack.aware.assignment.tags = []
rack.aware.assignment.traffic_cost = null
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
repartition.purge.interval.ms = 30000
replication.factor = -1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = SSL
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /Users/mikcar/git/nobill/sinchcampaigns/target/tmp/node2
statestore.cache.max.bytes = 10485760
task.assignor.class = null
task.timeout.ms = 300000
topology.optimization = none
upgrade.from = null
window.size.ms = null
windowed.inner.class.serde = null
windowstore.changelog.additional.retention.ms = 3600000{noformat}
> Spurious failure to close StateDirectory due to some task directories still
> locked
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-19960
> URL: https://issues.apache.org/jira/browse/KAFKA-19960
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.8.1
> Reporter: Mikael Carlstedt
> Priority: Critical
> Attachments: debug-1.log, debug.log
>
>
> We are seeing random failures to close state directories while closing a
> kafka-streams application in a test environment.
> *Preconditions:*
> * Two state stores
> * Three input partitions
> * Stand-by replication enabled (NB: we have not been able to reproduce
> without stand-by replication)
> * Two instances running on a single host with different state directory.
> The application instances are started before each test case is executed, and
> then closed when the test case has completed. Most of the time it works well
> without any errors logged, but sometimes we see this error message when
> closing an application instance:
>
> {noformat}
> 25-12-04T13:01:18.711 ERROR o.a.k.s.p.i.StateDirectory:397 - Some task
> directories still locked while closing state, this indicates unclean
> shutdown: {0_2= StreamsThread threadId:
> messages.nobill.campaigns.delay-queue-78f61f55-4863-4c4f-913c-bd398eceed0e-StreamThread-1
> TaskManager
> MetadataState:
> Tasks:
> , 0_0= StreamsThread threadId:
> messages.nobill.campaigns.delay-queue-78f61f55-4863-4c4f-913c-bd398eceed0e-StreamThread-1
> TaskManager
> MetadataState:
> Tasks:
> }{noformat}
>
> * is has a knock-on effect on all of the following test cases, which fail
> with this error message:
>
> {noformat}
> 25-12-04T13:01:28.684 ERROR s.s.n.k.k.i.KafkaClient:179 - Unhandled exception
> in Kafka streams application messages.nobill.campaigns.delay-queue
> org.apache.kafka.streams.errors.ProcessorStateException: Error opening store
> rate-limiterpriority-chronological-store-0 at location
> /tmp/node2/messages.nobill.campaigns.delay-queue/0_2/rocksdb/rate-limiterpriority-chronological-store-0
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:330)
> ...
> Caused by: org.rocksdb.RocksDBException: lock hold by current process,
> acquire time 1764849665 acquiring thread 14274883584:
> /tmp/node2/messages.nobill.campaigns.delay-queue/0_2/rocksdb/rate-limiterpriority-chronological-store-0/LOCK:
> No locks available
> at org.rocksdb.RocksDB.open(Native Method)
> at org.rocksdb.RocksDB.open(RocksDB.java:307)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:324)
> ... 19 common frames omitted{noformat}
> *Observation:*
>
> * Prior to the error, the two stand-by tasks that fail to release their
> locks are "closed and recycled":
> {noformat}
> 25-12-04T13:01:18.080 INFO o.a.k.s.p.i.StandbyTask:149 - stream-thread
> [messages.nobill.campaigns.delay-queue-78f61f55-4863-4c4f-913c-bd398eceed0e-StreamThread-1]
> standby-task [0_2] Suspended running
> 25-12-04T13:01:18.080 DEBUG o.a.k.s.p.i.ProcessorStateManager:633 -
> stream-thread
> [messages.nobill.campaigns.delay-queue-78f61f55-4863-4c4f-913c-bd398eceed0e-StreamThread-1]
> standby-task [0_2] Recycling state for STANDBY task 0_2.
> 25-12-04T13:01:18.080 DEBUG o.a.k.s.p.i.ProcessorStateManager:644 -
> stream-thread
> [messages.nobill.campaigns.delay-queue-78f61f55-4863-4c4f-913c-bd398eceed0e-StreamThread-1]
> standby-task [0_2] Clearing all store caches registered in the state
> manager: ...
> 25-12-04T13:01:18.080 INFO o.a.k.s.p.i.StandbyTask:254 - stream-thread
> [messages.nobill.campaigns.delay-queue-78f61f55-4863-4c4f-913c-bd398eceed0e-StreamThread-1]
> standby-task [0_2] Closed and recycled state{noformat}
> * By comparison, the third task is "closed clean":
> {noformat}
> 25-12-04T13:01:17.024 INFO o.a.k.s.p.i.StandbyTask:149 - stream-thread
> [messages.nobill.campaigns.delay-queue-bff50025-9296-45b7-ab1a-43480aee6f66-StreamThread-1]
> standby-task [0_1] Suspended running
> 25-12-04T13:01:17.024 DEBUG o.a.k.s.p.i.ProcessorStateManager:585 -
> stream-thread
> [messages.nobill.campaigns.delay-queue-bff50025-9296-45b7-ab1a-43480aee6f66-StreamThread-1]
> standby-task [0_1] Closing its state manager and all the registered state
> stores: ...
> 25-12-04T13:01:17.028 DEBUG o.a.k.s.p.i.StateDirectory:377 - stream-thread
> [messages.nobill.campaigns.delay-queue-bff50025-9296-45b7-ab1a-43480aee6f66-StreamThread-1]
> Released state dir lock for task 0_1
> 25-12-04T13:01:17.028 INFO o.a.k.s.p.i.StandbyTask:232 - stream-thread
> [messages.nobill.campaigns.delay-queue-bff50025-9296-45b7-ab1a-43480aee6f66-StreamThread-1]
> standby-task [0_1] Closed clean{noformat}
> What is it that triggers this "recycling" of stand-by tasks?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)