bbejeck commented on code in PR #21814:
URL: https://github.com/apache/kafka/pull/21814#discussion_r2955734162


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##########
@@ -135,6 +136,9 @@ public Collection<StreamTask> createTasks(final 
Consumer<byte[], byte[]> consume
                                               final Map<TaskId, 
Set<TopicPartition>> tasksToBeCreated) {
         final List<StreamTask> createdTasks = new ArrayList<>();
 
+        final String upgradeFromStr = 
applicationConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);

Review Comment:
   This might be counter intuitive but I'm wondering if we want to rename this 
internally to `downgradeoOffsetManagement` since that's how we are using it.  
Or we can also maybe update the KIP to add a new config, although that might be 
a stretch and just go with this and introduce a minor KIP with the new config.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -181,6 +182,7 @@ public String toString() {
 
     private final StateDirectory stateDirectory;
     private final File baseDir;
+    private final UpgradeFromValues upgradeFrom;

Review Comment:
   Same here and below, we can rename according to our semantic use.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -161,6 +163,8 @@ public GlobalStateManagerImpl(final LogContext logContext,
         final boolean globalEnabled = 
config.getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG);
         processingExceptionHandler = globalEnabled ? 
config.processingExceptionHandler() : null;
         eosEnabled = StreamsConfigUtils.eosEnabled(config);
+        final String upgradeFromStr = 
config.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
+        upgradeFrom = upgradeFromStr != null ? 
UpgradeFromValues.fromString(upgradeFromStr) : null;

Review Comment:
   Same here



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LegacyCheckpointingStateStore.java:
##########
@@ -77,6 +77,45 @@ public static StateStore maybeUnwrapStore(final StateStore 
store) {
                 : store;
     }
 
+    /**
+     * Writes a consolidated per-task {@code .checkpoint} file for downgrade 
support.
+     *
+     * When {@code upgradeFrom} is set to a version older than 4.3, this 
method writes the offsets into the legacy
+     * per-task checkpoint file so that an older Kafka Streams version can 
find its offsets after a downgrade.
+     *
+     * This is a no-op if {@code upgradeFrom} is {@code null} or refers to 
version 4.3 or later.
+     *
+     * @param logPrefix Log prefix to use for log messages.
+     * @param upgradeFrom The configured {@code upgrade.from} value, or {@code 
null} if not set.
+     * @param stateDirectory The singleton {@link StateDirectory} used for 
looking up state directories.
+     * @param taskId Either the task ID for regular stores, or {@code null} 
for global stores.
+     * @param offsets The offsets to write to the checkpoint file. Entries 
with {@code null} values are written as
+     *                {@link OffsetCheckpoint#OFFSET_UNKNOWN}.
+     */
+    public static void maybeDowngradeOffsets(final String logPrefix,
+                                             final UpgradeFromValues 
upgradeFrom,
+                                             final StateDirectory 
stateDirectory,
+                                             final TaskId taskId,
+                                             final Map<TopicPartition, Long> 
offsets) {
+        if (upgradeFrom == null || upgradeFrom.ordinal() > 
UpgradeFromValues.UPGRADE_FROM_42.ordinal()) {
+            return;
+        }
+
+        final Map<TopicPartition, Long> checkpointableOffsets = new 
HashMap<>();
+        for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) 
{
+            checkpointableOffsets.put(entry.getKey(), 
checkpointableOffsetFromChangelogOffset(entry.getValue()));
+        }
+
+        final File legacyCheckpointFile = checkpointFileFor(stateDirectory, 
taskId, null);
+        final OffsetCheckpoint checkpoint = new 
OffsetCheckpoint(legacyCheckpointFile);
+        try {
+            log.debug("{}Writing downgrade checkpoint file for task {} with 
offsets {}", logPrefix, taskId, checkpointableOffsets);
+            checkpoint.write(checkpointableOffsets);
+        } catch (final IOException e) {
+            log.warn("{}Failed to write downgrade checkpoint file for task 
{}", logPrefix, taskId, e);

Review Comment:
   I'm trying to think if we'd want to throw here, but we can re-visit later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to