daguimu commented on PR #21882:
URL: https://github.com/apache/kafka/pull/21882#issuecomment-4560373116

   Thanks for the careful review! I've pushed a fix commit (2cd0815) that 
addresses all four points.
   
   ### 1. NPE on null key/value deserializers (`ProcessorStateManager.java:555`)
   
   You were right — `Topology.addReadOnlyStateStore` permits null deserializers 
(its Javadoc lists only `storeBuilder`, `sourceName`, `topic`, `processorName`, 
and `stateUpdateSupplier` as non-null), and the previous `reprocessRestore` 
would NPE on them. The fix resolves nulls to the configured default serdes via 
`WrappingNullableUtils.prepareKeyDeserializer` / `prepareValueDeserializer`, 
mirroring how `SourceNode` handles this for normal processing (see 
`SourceNode#init` lines 79–89). The resolved deserializers are cached alongside 
the initialized processor in a new private `ReprocessState` holder so the 
lookup happens once per store.
   
   New test: 
`ProcessorStateManagerTest#shouldFallBackToDefaultDeserializersWhenReprocessFactoryDeserializersAreNull`.
   
   ### 2. 2-arg `deserialize` ignored record headers 
(`ProcessorStateManager.java:553`)
   
   Switched to the 3-arg `Deserializer#deserialize(String topic, Headers 
headers, byte[] data)` overload. `SourceNode#deserializeKey` / 
`deserializeValue` already use the headers-aware overload for normal 
processing, so this brings restore behavior into line for header-dependent 
deserializers. (Side note: `GlobalStateManagerImpl#reprocessState` still uses 
the 2-arg form for the global-store path — that's a pre-existing inconsistency 
that I think is best fixed in a separate follow-up to keep this PR focused.)
   
   ### 3. Only caught `RuntimeException` (`ProcessorStateManager.java:562`)
   
   Switched to `catch (final Exception e)` with the same explanatory comment 
that `GlobalStateManagerImpl#reprocessState` carries verbatim:
   
   > while Java distinguishes checked vs unchecked exceptions, other languages 
like Scala or Kotlin do not, and thus we need to catch \`Exception\` (instead 
of \`RuntimeException\`) to work well with those languages
   
   Applied the same change in the `close()` cleanup loop where cached reprocess 
processors are closed.
   
   ### 4. Misleading test name + missing `processCallCount` assertion 
(`ReadOnlyStoreTest.java:228`)
   
   `shouldHandleNullKeyRecordsDuringReprocessRestore` was misnamed for a more 
fundamental reason than the missing null-key input: it used 
`TopologyTestDriver.pipeInput`, which exercises *normal source-node 
processing*, not the `reprocessRestore` path the test name implied — so just 
renaming it or adding a null-key piped input wouldn't have produced a test that 
actually covered the restore code path.
   
   Instead I removed it and added two unit tests in `ProcessorStateManagerTest` 
that invoke `ProcessorStateManager#restore` directly, so the `reprocessRestore` 
path is genuinely exercised:
   
   - `shouldSkipNullKeyRecordsDuringReprocessRestore` — verifies the `if 
(converted.key() != null)` guard skips null-key records during restore and the 
processor is invoked only for the valid record.
   - 
`shouldFallBackToDefaultDeserializersWhenReprocessFactoryDeserializersAreNull` 
— locks in the NPE fix from point 1 above.
   
   ---
   
   All impacted tests pass locally:
   
   \`\`\`
   ./gradlew :streams:test --tests "*ProcessorStateManagerTest" --tests 
"*ReadOnlyStoreTest"
   \`\`\`
   
   and `:streams:checkstyleMain` / `:streams:checkstyleTest` are clean. Happy 
to make further adjustments if anything looks off.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to