Copilot commented on code in PR #21882:
URL: https://github.com/apache/kafka/pull/21882#discussion_r3311278281
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -494,6 +526,44 @@ void restore(final StateStoreMetadata storeMetadata, final
List<ConsumerRecord<b
}
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void reprocessRestore(final StateStoreMetadata storeMetadata,
+ final List<ConsumerRecord<byte[], byte[]>>
restoreRecords,
+ final
InternalTopologyBuilder.ReprocessFactory reprocessFactory) {
+ final String storeName = storeMetadata.store().name();
+ final Processor processor =
reprocessorCache.computeIfAbsent(storeName, k -> {
+ final Processor p = reprocessFactory.processorSupplier().get();
+ p.init((ProcessorContext) processorContext);
+ return p;
+ });
+
+ for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
+ final ConsumerRecord<byte[], byte[]> converted =
storeMetadata.recordConverter.convert(record);
+ if (converted.key() != null) {
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ converted.timestamp(),
+ converted.offset(),
+ converted.partition(),
+ converted.topic(),
+ converted.headers());
+ processorContext.setRecordContext(recordContext);
+
+ try {
+ final Object key =
reprocessFactory.keyDeserializer().deserialize(converted.topic(),
converted.key());
+ final Object value =
reprocessFactory.valueDeserializer().deserialize(converted.topic(),
converted.value());
+ final long timestamp = Math.max(0L, converted.timestamp());
+ processor.process(new Record<>(key, value, timestamp,
converted.headers()));
Review Comment:
`addReadOnlyStateStore` explicitly allows
`keyDeserializer`/`valueDeserializer` to be null (to use the defaults), but
`reprocessRestore` unconditionally calls
`reprocessFactory.keyDeserializer().deserialize(...)` /
`valueDeserializer()...`, which will NPE if either deserializer is null. Please
resolve the effective deserializers from `processorContext` when the factory
deserializers are null (and also ensure wrapping nullable deserializers are
initialized), so restoration matches the normal source-node path.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -494,6 +526,44 @@ void restore(final StateStoreMetadata storeMetadata, final
List<ConsumerRecord<b
}
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void reprocessRestore(final StateStoreMetadata storeMetadata,
+ final List<ConsumerRecord<byte[], byte[]>>
restoreRecords,
+ final
InternalTopologyBuilder.ReprocessFactory reprocessFactory) {
+ final String storeName = storeMetadata.store().name();
+ final Processor processor =
reprocessorCache.computeIfAbsent(storeName, k -> {
+ final Processor p = reprocessFactory.processorSupplier().get();
+ p.init((ProcessorContext) processorContext);
+ return p;
+ });
+
+ for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
+ final ConsumerRecord<byte[], byte[]> converted =
storeMetadata.recordConverter.convert(record);
+ if (converted.key() != null) {
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ converted.timestamp(),
+ converted.offset(),
+ converted.partition(),
+ converted.topic(),
+ converted.headers());
+ processorContext.setRecordContext(recordContext);
+
+ try {
+ final Object key =
reprocessFactory.keyDeserializer().deserialize(converted.topic(),
converted.key());
+ final Object value =
reprocessFactory.valueDeserializer().deserialize(converted.topic(),
converted.value());
Review Comment:
The deserialization in `reprocessRestore` ignores record headers by calling
the 2-arg `Deserializer#deserialize(String, byte[])`. Normal processing uses
the headers-aware overload, so header-dependent deserializers will behave
differently during restoration. Consider using the `deserialize(String,
Headers, byte[])` overload here to keep restoration behavior consistent with
runtime processing.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -494,6 +526,44 @@ void restore(final StateStoreMetadata storeMetadata, final
List<ConsumerRecord<b
}
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private void reprocessRestore(final StateStoreMetadata storeMetadata,
+ final List<ConsumerRecord<byte[], byte[]>>
restoreRecords,
+ final
InternalTopologyBuilder.ReprocessFactory reprocessFactory) {
+ final String storeName = storeMetadata.store().name();
+ final Processor processor =
reprocessorCache.computeIfAbsent(storeName, k -> {
+ final Processor p = reprocessFactory.processorSupplier().get();
+ p.init((ProcessorContext) processorContext);
+ return p;
+ });
+
+ for (final ConsumerRecord<byte[], byte[]> record : restoreRecords) {
+ final ConsumerRecord<byte[], byte[]> converted =
storeMetadata.recordConverter.convert(record);
+ if (converted.key() != null) {
+ final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
+ converted.timestamp(),
+ converted.offset(),
+ converted.partition(),
+ converted.topic(),
+ converted.headers());
+ processorContext.setRecordContext(recordContext);
+
+ try {
+ final Object key =
reprocessFactory.keyDeserializer().deserialize(converted.topic(),
converted.key());
+ final Object value =
reprocessFactory.valueDeserializer().deserialize(converted.topic(),
converted.value());
+ final long timestamp = Math.max(0L, converted.timestamp());
+ processor.process(new Record<>(key, value, timestamp,
converted.headers()));
+ } catch (final RuntimeException e) {
+ throw new ProcessorStateException(
+ format("%sException caught while trying to
reprocess-restore state from %s",
+ logPrefix, storeMetadata.changelogPartition),
+ e
+ );
+ }
Review Comment:
`reprocessRestore` only catches `RuntimeException`, but user code invoked
here (deserializers and `processor.process`) can throw checked `Exception`s in
languages without checked-exception enforcement (eg Kotlin/Scala). This can
leak out of restore without being wrapped as a `ProcessorStateException`.
Please broaden the catch to `Exception` (as done in
`GlobalStateManagerImpl.reprocessState`) and wrap consistently.
##########
streams/src/test/java/org/apache/kafka/streams/processor/ReadOnlyStoreTest.java:
##########
@@ -130,4 +130,111 @@ public void process(final Record<Integer, String> record)
{
assertThat(output.readKeyValuesToList(), equalTo(expectedResult));
}
}
+
+ @Test
+ public void shouldUseCustomProcessorDuringRestorationWithTransformation() {
+ final java.util.concurrent.atomic.AtomicInteger processCallCount = new
java.util.concurrent.atomic.AtomicInteger(0);
+
+ final Topology topology = new Topology();
+ topology.addReadOnlyStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("readOnlyStore"),
+ new Serdes.IntegerSerde(),
+ new Serdes.StringSerde()
+ ),
+ "readOnlySource",
+ new IntegerDeserializer(),
+ new StringDeserializer(),
+ "storeTopic",
+ "readOnlyProcessor",
+ () -> new Processor<>() {
+ KeyValueStore<Integer, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore("readOnlyStore");
+ }
+ @Override
+ public void process(final Record<Integer, String> record) {
+ processCallCount.incrementAndGet();
+ // Custom transformation: prepend "processed-" to the value
+ store.put(record.key(), "processed-" + record.value());
+ }
+ }
+ );
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology)) {
+ final TestInputTopic<Integer, String> readOnlyStoreTopic =
+ driver.createInputTopic("storeTopic", new IntegerSerializer(),
new StringSerializer());
+
+ readOnlyStoreTopic.pipeInput(1, "foo");
+ readOnlyStoreTopic.pipeInput(2, "bar");
+
+ final KeyValueStore<Integer, String> store =
driver.getKeyValueStore("readOnlyStore");
+
+ try (final KeyValueIterator<Integer, String> it = store.all()) {
+ final List<KeyValue<Integer, String>> storeContent = new
LinkedList<>();
+ it.forEachRemaining(storeContent::add);
+
+ // Values should have the "processed-" prefix from the custom
processor
+ final List<KeyValue<Integer, String>> expectedResult = new
LinkedList<>();
+ expectedResult.add(KeyValue.pair(1, "processed-foo"));
+ expectedResult.add(KeyValue.pair(2, "processed-bar"));
+
+ assertThat(storeContent, equalTo(expectedResult));
+ }
+
+ // Verify the processor was actually called
+ assertThat(processCallCount.get(), equalTo(2));
+ }
+ }
+
+ @Test
+ public void shouldHandleNullKeyRecordsDuringReprocessRestore() {
+ final java.util.concurrent.atomic.AtomicInteger processCallCount = new
java.util.concurrent.atomic.AtomicInteger(0);
+
+ final Topology topology = new Topology();
+ topology.addReadOnlyStateStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("readOnlyStore"),
+ new Serdes.IntegerSerde(),
+ new Serdes.StringSerde()
+ ),
+ "readOnlySource",
+ new IntegerDeserializer(),
+ new StringDeserializer(),
+ "storeTopic",
+ "readOnlyProcessor",
+ () -> new Processor<>() {
+ KeyValueStore<Integer, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore("readOnlyStore");
+ }
+ @Override
+ public void process(final Record<Integer, String> record) {
+ processCallCount.incrementAndGet();
+ store.put(record.key(), record.value());
+ }
+ }
+ );
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology)) {
+ final TestInputTopic<Integer, String> readOnlyStoreTopic =
+ driver.createInputTopic("storeTopic", new IntegerSerializer(),
new StringSerializer());
+
+ readOnlyStoreTopic.pipeInput(1, "value1");
+
Review Comment:
This test name suggests it validates null-key handling, but the test never
pipes a null key (it uses `pipeInput(1, "value1")`) and also never asserts
`processCallCount`. Either update the test to actually exercise a null-key
record and assert the expected behavior, or rename it to reflect what it really
verifies.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]