Copilot commented on code in PR #21882:
URL: https://github.com/apache/kafka/pull/21882#discussion_r3311278281


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -494,6 +526,44 @@ void restore(final StateStoreMetadata storeMetadata, final 
List<ConsumerRecord<b
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private void reprocessRestore(final StateStoreMetadata storeMetadata,
+                                  final List<ConsumerRecord<byte[], byte[]>> 
restoreRecords,
+                                  final 
InternalTopologyBuilder.ReprocessFactory reprocessFactory) {
+        final String storeName = storeMetadata.store().name();
+        final Processor processor = 
reprocessorCache.computeIfAbsent(storeName, k -> {
+            final Processor p = reprocessFactory.processorSupplier().get();
+            p.init((ProcessorContext) processorContext);
+            return p;
+        });
+
+        for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
+            final ConsumerRecord<byte[], byte[]> converted = 
storeMetadata.recordConverter.convert(record);
+            if (converted.key() != null) {
+                final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    converted.timestamp(),
+                    converted.offset(),
+                    converted.partition(),
+                    converted.topic(),
+                    converted.headers());
+                processorContext.setRecordContext(recordContext);
+
+                try {
+                    final Object key = 
reprocessFactory.keyDeserializer().deserialize(converted.topic(), 
converted.key());
+                    final Object value = 
reprocessFactory.valueDeserializer().deserialize(converted.topic(), 
converted.value());
+                    final long timestamp = Math.max(0L, converted.timestamp());
+                    processor.process(new Record<>(key, value, timestamp, 
converted.headers()));

Review Comment:
   `addReadOnlyStateStore` explicitly allows 
`keyDeserializer`/`valueDeserializer` to be null (to use the defaults), but 
`reprocessRestore` unconditionally calls 
`reprocessFactory.keyDeserializer().deserialize(...)` / 
`valueDeserializer()...`, which will NPE if either deserializer is null. Please 
resolve the effective deserializers from `processorContext` when the factory 
deserializers are null (and also ensure wrapping nullable deserializers are 
initialized), so restoration matches the normal source-node path.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -494,6 +526,44 @@ void restore(final StateStoreMetadata storeMetadata, final 
List<ConsumerRecord<b
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private void reprocessRestore(final StateStoreMetadata storeMetadata,
+                                  final List<ConsumerRecord<byte[], byte[]>> 
restoreRecords,
+                                  final 
InternalTopologyBuilder.ReprocessFactory reprocessFactory) {
+        final String storeName = storeMetadata.store().name();
+        final Processor processor = 
reprocessorCache.computeIfAbsent(storeName, k -> {
+            final Processor p = reprocessFactory.processorSupplier().get();
+            p.init((ProcessorContext) processorContext);
+            return p;
+        });
+
+        for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
+            final ConsumerRecord<byte[], byte[]> converted = 
storeMetadata.recordConverter.convert(record);
+            if (converted.key() != null) {
+                final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    converted.timestamp(),
+                    converted.offset(),
+                    converted.partition(),
+                    converted.topic(),
+                    converted.headers());
+                processorContext.setRecordContext(recordContext);
+
+                try {
+                    final Object key = 
reprocessFactory.keyDeserializer().deserialize(converted.topic(), 
converted.key());
+                    final Object value = 
reprocessFactory.valueDeserializer().deserialize(converted.topic(), 
converted.value());

Review Comment:
   The deserialization in `reprocessRestore` ignores record headers by calling 
the 2-arg `Deserializer#deserialize(String, byte[])`. Normal processing uses 
the headers-aware overload, so header-dependent deserializers will behave 
differently during restoration. Consider using the `deserialize(String, 
Headers, byte[])` overload here to keep restoration behavior consistent with 
runtime processing.
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -494,6 +526,44 @@ void restore(final StateStoreMetadata storeMetadata, final 
List<ConsumerRecord<b
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private void reprocessRestore(final StateStoreMetadata storeMetadata,
+                                  final List<ConsumerRecord<byte[], byte[]>> 
restoreRecords,
+                                  final 
InternalTopologyBuilder.ReprocessFactory reprocessFactory) {
+        final String storeName = storeMetadata.store().name();
+        final Processor processor = 
reprocessorCache.computeIfAbsent(storeName, k -> {
+            final Processor p = reprocessFactory.processorSupplier().get();
+            p.init((ProcessorContext) processorContext);
+            return p;
+        });
+
+        for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
+            final ConsumerRecord<byte[], byte[]> converted = 
storeMetadata.recordConverter.convert(record);
+            if (converted.key() != null) {
+                final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
+                    converted.timestamp(),
+                    converted.offset(),
+                    converted.partition(),
+                    converted.topic(),
+                    converted.headers());
+                processorContext.setRecordContext(recordContext);
+
+                try {
+                    final Object key = 
reprocessFactory.keyDeserializer().deserialize(converted.topic(), 
converted.key());
+                    final Object value = 
reprocessFactory.valueDeserializer().deserialize(converted.topic(), 
converted.value());
+                    final long timestamp = Math.max(0L, converted.timestamp());
+                    processor.process(new Record<>(key, value, timestamp, 
converted.headers()));
+                } catch (final RuntimeException e) {
+                    throw new ProcessorStateException(
+                        format("%sException caught while trying to 
reprocess-restore state from %s",
+                            logPrefix, storeMetadata.changelogPartition),
+                        e
+                    );
+                }

Review Comment:
   `reprocessRestore` only catches `RuntimeException`, but user code invoked 
here (deserializers and `processor.process`) can throw checked `Exception`s in 
languages without checked-exception enforcement (eg Kotlin/Scala). This can 
leak out of restore without being wrapped as a `ProcessorStateException`. 
Please broaden the catch to `Exception` (as done in 
`GlobalStateManagerImpl.reprocessState`) and wrap consistently.



##########
streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java:
##########
@@ -130,4 +130,111 @@ public void process(final Record<Integer, String> record) 
{
             assertThat(output.readKeyValuesToList(), equalTo(expectedResult));
         }
     }
+
+    @Test
+    public void shouldUseCustomProcessorDuringRestorationWithTransformation() {
+        final java.util.concurrent.atomic.AtomicInteger processCallCount = new 
java.util.concurrent.atomic.AtomicInteger(0);
+
+        final Topology topology = new Topology();
+        topology.addReadOnlyStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("readOnlyStore"),
+                new Serdes.IntegerSerde(),
+                new Serdes.StringSerde()
+            ),
+            "readOnlySource",
+            new IntegerDeserializer(),
+            new StringDeserializer(),
+            "storeTopic",
+            "readOnlyProcessor",
+            () -> new Processor<>() {
+                KeyValueStore<Integer, String> store;
+
+                @Override
+                public void init(final ProcessorContext<Void, Void> context) {
+                    store = context.getStateStore("readOnlyStore");
+                }
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    processCallCount.incrementAndGet();
+                    // Custom transformation: prepend "processed-" to the value
+                    store.put(record.key(), "processed-" + record.value());
+                }
+            }
+        );
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology)) {
+            final TestInputTopic<Integer, String> readOnlyStoreTopic =
+                driver.createInputTopic("storeTopic", new IntegerSerializer(), 
new StringSerializer());
+
+            readOnlyStoreTopic.pipeInput(1, "foo");
+            readOnlyStoreTopic.pipeInput(2, "bar");
+
+            final KeyValueStore<Integer, String> store = 
driver.getKeyValueStore("readOnlyStore");
+
+            try (final KeyValueIterator<Integer, String> it = store.all()) {
+                final List<KeyValue<Integer, String>> storeContent = new 
LinkedList<>();
+                it.forEachRemaining(storeContent::add);
+
+                // Values should have the "processed-" prefix from the custom 
processor
+                final List<KeyValue<Integer, String>> expectedResult = new 
LinkedList<>();
+                expectedResult.add(KeyValue.pair(1, "processed-foo"));
+                expectedResult.add(KeyValue.pair(2, "processed-bar"));
+
+                assertThat(storeContent, equalTo(expectedResult));
+            }
+
+            // Verify the processor was actually called
+            assertThat(processCallCount.get(), equalTo(2));
+        }
+    }
+
+    @Test
+    public void shouldHandleNullKeyRecordsDuringReprocessRestore() {
+        final java.util.concurrent.atomic.AtomicInteger processCallCount = new 
java.util.concurrent.atomic.AtomicInteger(0);
+
+        final Topology topology = new Topology();
+        topology.addReadOnlyStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("readOnlyStore"),
+                new Serdes.IntegerSerde(),
+                new Serdes.StringSerde()
+            ),
+            "readOnlySource",
+            new IntegerDeserializer(),
+            new StringDeserializer(),
+            "storeTopic",
+            "readOnlyProcessor",
+            () -> new Processor<>() {
+                KeyValueStore<Integer, String> store;
+
+                @Override
+                public void init(final ProcessorContext<Void, Void> context) {
+                    store = context.getStateStore("readOnlyStore");
+                }
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    processCallCount.incrementAndGet();
+                    store.put(record.key(), record.value());
+                }
+            }
+        );
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology)) {
+            final TestInputTopic<Integer, String> readOnlyStoreTopic =
+                driver.createInputTopic("storeTopic", new IntegerSerializer(), 
new StringSerializer());
+
+            readOnlyStoreTopic.pipeInput(1, "value1");
+

Review Comment:
   This test name suggests it validates null-key handling, but the test never 
pipes a null key (it uses `pipeInput(1, "value1")`) and also never asserts 
`processCallCount`. Either update the test to actually exercise a null-key 
record and assert the expected behavior, or rename it to reflect what it really 
verifies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to