DOJI45 commented on a change in pull request #9247: URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + + public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + + // Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) + try { + stateMgr.deleteCheckPointFile(); + log.debug("Deleted check point file"); Review comment: I will change the log message ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org