smjn opened a new pull request, #22502: URL: https://github.com/apache/kafka/pull/22502
- In `SharePartition.maybeInitialize`, while loading state batches, collect any in the `ARCHIVING` state. After initialization succeeds (partition `ACTIVE`, future completed, write lock released), resume the DLQ flow for each via the existing `initiateDLQAndArchive` driver. This is outside the lock and asynchronously, matching the acknowledge/timeout paths. - The DLQ cause is not persisted, so it is inferred from the delivery count: `deliveryCount >= maxDeliveryCount` ⇒ `DELIVERY_COUNT_EXCEEDED`, otherwise `CLIENT_REJECT`. (A reject issued exactly at max count is labeled `DELIVERY_COUNT_EXCEEDED`. This is unavoidable without persisting the cause.) - Draining is unconditional (not gated on the current `isDLQEnabledForGroup()`): the archive decision was made in a prior epoch, and these records must reach `ARCHIVED` regardless to avoid stalling the start offset. The DLQ enqueue remains best-effort. `initiateDLQAndArchive` proceeds to `ARCHIVED` even if the enqueue fails. - `ARCHIVING` records are not counted in deliveryCompleteCount an they reach `ARCHIVED`, consistent with the live paths. - `initiateDLQAndArchive` and the `DlqBatch` record now (a method reference such as `updateResult::archive` or `inFlightBatch::archiveBatch`) instead of a raw `InFlightState`. - 5 new tests have been added to verify various new code paths introduced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
