[ 
https://issues.apache.org/jira/browse/KAFKA-20096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18057151#comment-18057151
 ] 

Arpit Goyal edited comment on KAFKA-20096 at 2/8/26 12:39 PM:
--------------------------------------------------------------

[~mjsax]  While testing, I figured out we never create rocksdb store because of 
the topology we defined in StreamsUpgradeTest. 
{code:java}
final KTable<String, Integer> dataTable = builder.table(
    "data", 
    Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
 When you create a KTable without explicit materialization , Streams optimiser 
sees "This KTable is never actually queried, just passed through to a stream" → 
No state store created!

After adding explicit materialization 

 
{code:java}
final KTable<String, Integer> dataTable = builder.table(
    "data", 
    Consumed.with(stringSerde, intSerde),
    org.apache.kafka.streams.kstream.Materialized.as("data-store"));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
 

I am able to see rocksdb store created within the 
/mnt/streams/StreamUpgradeTest path. 

I wrote  two test:
 # Upgrade path -[3.9 to 4.1]

        Worked successfully. 
{code:java}
[2026-02-08 12:27:33,351] DEBUG stream-thread 
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] Main 
Consumer poll completed in 100 ms and fetched 0 records from partitions [] 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2026-02-08 12:27:33,352] DEBUG stream-thread 
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] task 
[0_0] Acquired state directory lock 
(org.apache.kafka.streams.processor.internals.StreamTask)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName [name=put-rate, 
group=stream-state-metrics, description=The average number of calls to put per 
second, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName 
[name=put-latency-avg, group=stream-state-metrics, description=The average 
latency of calls to put, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName 
[name=put-latency-max, group=stream-state-metrics, description=The maximum 
latency of calls to put, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName 
[name=put-if-absent-rate, group=stream-state-metrics, description=The average 
number of calls to put-if-absent per second, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
 {code}
    2. Downgrade Path -[4.1 to 3.9] 

        Failed error 
{code:java}
[2026-02-08 12:08:04,078] DEBUG stream-thread 
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1] 
standby-task [0_2] Acquired state directory lock 
(org.apache.kafka.streams.processor.internals.StandbyTask)
[2026-02-08 12:08:04,203] ERROR stream-thread 
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1] Get 
exceptions for the following tasks: 
{0_2=org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
store data-store at location 
/mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store} 
(org.apache.kafka.streams.processor.internals.TaskManager)
[2026-02-08 12:08:04,204] ERROR stream-client 
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061] Encountered the 
following exception during processing and the registered exception handler 
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  
(org.apache.kafka.streams.KafkaStreams)
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
data-store at location /mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:330)
        at 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:70)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:255)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:176)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:114)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:159)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:159)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:234)
        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
        at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1016)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:1001)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:915)
 {code}




was (Author: JIRAUSER301926):
[~mjsax]  While testing, I figured out we never create rocksdb store because of 
the topology we defined in StreamsUpgradeTest. 
{code:java}
final KTable<String, Integer> dataTable = builder.table(
    "data", 
    Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
 When you create a KTable without explicit materialization , Streams optimiser 
sees "This KTable is never actually queried, just passed through to a stream" → 
No state store created!

After adding explicit materialization 

 
{code:java}
final KTable<String, Integer> dataTable = builder.table(
    "data", 
    Consumed.with(stringSerde, intSerde),
    org.apache.kafka.streams.kstream.Materialized.as("data-store"));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
 

I am able to see rocksdb store created within the 
/mnt/streams/StreamUpgradeTest path. 

I wrote  two test:
 # Upgrade path -[3.9 to 4.1]

        Worked successfully. 
{code:java}
[2026-02-08 12:27:33,351] DEBUG stream-thread 
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] Main 
Consumer poll completed in 100 ms and fetched 0 records from partitions [] 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2026-02-08 12:27:33,352] DEBUG stream-thread 
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] task 
[0_0] Acquired state directory lock 
(org.apache.kafka.streams.processor.internals.StreamTask)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName [name=put-rate, 
group=stream-state-metrics, description=The average number of calls to put per 
second, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName 
[name=put-latency-avg, group=stream-state-metrics, description=The average 
latency of calls to put, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName 
[name=put-latency-max, group=stream-state-metrics, description=The maximum 
latency of calls to put, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName 
[name=put-if-absent-rate, group=stream-state-metrics, description=The average 
number of calls to put-if-absent per second, 
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
 task-id=0_0, rocksdb-state-id=data-store}] 
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
 {code}
    2. Downgrade Path -[4.1 to 3.9] 

        Failed error 
{code:java}
[2026-02-08 12:08:04,078] DEBUG stream-thread 
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1] 
standby-task [0_2] Acquired state directory lock 
(org.apache.kafka.streams.processor.internals.StandbyTask)
[2026-02-08 12:08:04,203] ERROR stream-thread 
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1] Get 
exceptions for the following tasks: 
{0_2=org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
store data-store at location 
/mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store} 
(org.apache.kafka.streams.processor.internals.TaskManager)
[2026-02-08 12:08:04,204] ERROR stream-client 
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061] Encountered the 
following exception during processing and the registered exception handler 
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  
(org.apache.kafka.streams.KafkaStreams)
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
data-store at location /mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:330)
        at 
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:70)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:255)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:176)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:114)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:159)
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:159)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:234)
        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
        at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1016)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:1001)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:915)
 {code}

> RocksDB compatibility not documented
> ------------------------------------
>
>                 Key: KAFKA-20096
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20096
>             Project: Kafka
>          Issue Type: Improvement
>          Components: docs, streams, system tests
>    Affects Versions: 4.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Arpit Goyal
>            Priority: Critical
>
> With 4.0 release, we upgraded RocksDB from 7.9 to 9.7 
> (https://issues.apache.org/jira/browse/KAFKA-15443) – however, we did miss 
> that RocksDB introduces a file format version bump from version 5 to version 
> 6 with RocksDB 8.6.
> While this does not impact the upgrade path, it does impact the downgrade 
> path. This limitation is no documented though. – We should also investigate 
> `RocksDBConfigSetter` which seems to allow to configure the used file-format 
> version via `tableconfig.setFormVersion`.
> We should also double check system test coverage for upgrade/downgrade path 
> with RocksDB – ideally we should catch such issue; seems there is some 
> testing gap.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to