eduwercamacaro commented on code in PR #21578:
URL: https://github.com/apache/kafka/pull/21578#discussion_r2896852363


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractColumnFamilyAccessor.java:
##########
@@ -59,14 +60,14 @@ public final void commit(final RocksDBStore.DBAccessor 
accessor, final Map<Topic
     }
 
     @Override
-    public void open(final RocksDBStore.DBAccessor accessor) throws 
RocksDBException {
+    public final void open(final RocksDBStore.DBAccessor accessor, final 
boolean ignoreInvalidState) throws RocksDBException {
         final byte[] valueBytes = accessor.get(offsetColumnFamilyHandle, 
statusKey);
-        if (valueBytes == null || Arrays.equals(valueBytes, closedState)) {
+        if (ignoreInvalidState || (valueBytes == null || 
Arrays.equals(valueBytes, closedState))) {
             // If the status key is not present, we initialize it to "OPEN"
             accessor.put(offsetColumnFamilyHandle, statusKey, openState);
             open = true;
         } else {
-            throw new RocksDBException("Invalid state");
+            throw new StreamsException("Invalid state during store open. 
Expected state to be either empty or closed");

Review Comment:
   RocksDBStore class wraps this `StreamsException` into a 
`ProcessorStateException` in the `init` method, as it has more context about 
the configured processing guarantee. I think `StreamsException` is better at 
the CF accessor level because it is a general exception that is not necessarily 
related to processing context. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to