Nicolas Carlot created KAFKA-9880: ------------------------------------- Summary: Error while range compacting during bulk loading of FIFO compacted RocksDB Store Key: KAFKA-9880 URL: https://issues.apache.org/jira/browse/KAFKA-9880 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.1 Reporter: Nicolas Carlot
When restoring a non empty RocksDB state store, if it is customized to use FIFOCompaction, the following exception is thrown: {code:java} exception thrown by the KStream process is:org.apache.kafka.streams.errors.ProcessorStateException: Error while range compacting during restoring store merge_store at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:185) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:81) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:389) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-stream-router.jar:?] Caused by: org.rocksdb.RocksDBException: Target level exceeds number of levels at org.rocksdb.RocksDB.compactRange(Native Method) ~[kafka-stream-router.jar:?] at org.rocksdb.RocksDB.compactRange(RocksDB.java:2636) ~[kafka-stream-router.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:613) ~[kafka-stream-router.jar:?] ... 11 more {code} Compaction is configured through an implementation of RocksDBConfigSetter. The exception si gone as soon as I remove: {code:java} CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO(); fifoOptions.setMaxTableFilesSize(maxSize); fifoOptions.setAllowCompaction(true); options.setCompactionOptionsFIFO(fifoOptions); options.setCompactionStyle(CompactionStyle.FIFO); {code} Bulk loading works fine when the store is non-existent / empty. This occurs only when there are a minimum amount of data in it. I guess it happens when the amount SST layers is increased. I'm currently using a forked version of Kafka 2.4.1 customizing the RocksDBStore class with this modification as a work around: {code:java} @Override@Override @SuppressWarnings("deprecation") public void toggleDbForBulkLoading() { try { db.compactRange(columnFamily, true, 1, 0); } catch (final RocksDBException e) { try { if (columnFamily.getDescriptor().getOptions().compactionStyle() != CompactionStyle.FIFO) { throw new ProcessorStateException("Error while range compacting while restoring store " + name, e); } else { log.warn("Compaction of store " + name + " for bulk loading failed. Will continue without compacted store, which will be slower.", e); } } catch (RocksDBException e1) { throw new ProcessorStateException("Error while range compacting during restoring store " + name, e); } } } {code} I'm not very proud of this workaround, but it suits my use cases well. -- This message was sent by Atlassian Jira (v8.3.4#803005)