daguimu commented on PR #21882: URL: https://github.com/apache/kafka/pull/21882#issuecomment-4560373116
Thanks for the careful review! I've pushed a fix commit (2cd0815) that addresses all four points. ### 1. NPE on null key/value deserializers (`ProcessorStateManager.java:555`) You were right — `Topology.addReadOnlyStateStore` permits null deserializers (its Javadoc lists only `storeBuilder`, `sourceName`, `topic`, `processorName`, and `stateUpdateSupplier` as non-null), and the previous `reprocessRestore` would NPE on them. The fix resolves nulls to the configured default serdes via `WrappingNullableUtils.prepareKeyDeserializer` / `prepareValueDeserializer`, mirroring how `SourceNode` handles this for normal processing (see `SourceNode#init` lines 79–89). The resolved deserializers are cached alongside the initialized processor in a new private `ReprocessState` holder so the lookup happens once per store. New test: `ProcessorStateManagerTest#shouldFallBackToDefaultDeserializersWhenReprocessFactoryDeserializersAreNull`. ### 2. 2-arg `deserialize` ignored record headers (`ProcessorStateManager.java:553`) Switched to the 3-arg `Deserializer#deserialize(String topic, Headers headers, byte[] data)` overload. `SourceNode#deserializeKey` / `deserializeValue` already use the headers-aware overload for normal processing, so this brings restore behavior into line for header-dependent deserializers. (Side note: `GlobalStateManagerImpl#reprocessState` still uses the 2-arg form for the global-store path — that's a pre-existing inconsistency that I think is best fixed in a separate follow-up to keep this PR focused.) ### 3. Only caught `RuntimeException` (`ProcessorStateManager.java:562`) Switched to `catch (final Exception e)` with the same explanatory comment that `GlobalStateManagerImpl#reprocessState` carries verbatim: > while Java distinguishes checked vs unchecked exceptions, other languages like Scala or Kotlin do not, and thus we need to catch \`Exception\` (instead of \`RuntimeException\`) to work well with those languages Applied the same change in the `close()` cleanup loop where cached reprocess processors are closed. ### 4. Misleading test name + missing `processCallCount` assertion (`ReadOnlyStoreTest.java:228`) `shouldHandleNullKeyRecordsDuringReprocessRestore` was misnamed for a more fundamental reason than the missing null-key input: it used `TopologyTestDriver.pipeInput`, which exercises *normal source-node processing*, not the `reprocessRestore` path the test name implied — so just renaming it or adding a null-key piped input wouldn't have produced a test that actually covered the restore code path. Instead I removed it and added two unit tests in `ProcessorStateManagerTest` that invoke `ProcessorStateManager#restore` directly, so the `reprocessRestore` path is genuinely exercised: - `shouldSkipNullKeyRecordsDuringReprocessRestore` — verifies the `if (converted.key() != null)` guard skips null-key records during restore and the processor is invoked only for the valid record. - `shouldFallBackToDefaultDeserializersWhenReprocessFactoryDeserializersAreNull` — locks in the NPE fix from point 1 above. --- All impacted tests pass locally: \`\`\` ./gradlew :streams:test --tests "*ProcessorStateManagerTest" --tests "*ReadOnlyStoreTest" \`\`\` and `:streams:checkstyleMain` / `:streams:checkstyleTest` are clean. Happy to make further adjustments if anything looks off. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
