vvcephei commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r414837565



##########
File path: checkstyle/suppressions.xml
##########
@@ -156,7 +156,7 @@
               
files="(TopologyBuilder|KafkaStreams|KStreamImpl|KTableImpl|StreamThread|StreamTask).java"/>
 
     <suppress checks="MethodLength"
-              files="(KTableImpl|StreamsPartitionAssignor.java)"/>
+              files="(KTableImpl|StreamsPartitionAssignor|TaskManager).java"/>

Review comment:
       Ok, it seems like this one at least is avoidable.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -247,6 +271,25 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
         }
     }
 
+    private void registerStoreWithChangelogReader(final String storeName) {
+        // if the store name does not exist in the changelog map, it means the 
underlying store
+        // is not log enabled (including global stores), and hence it does not 
need to be restored

Review comment:
       This comment needs to move into `isLoggingEnabled`. It doesn't make 
sense in this context anymore.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final 
String threadClientId, final
                 partitions
             );
 
-            if (threadProducer == null) {
-                final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
-                final Map<String, Object> producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
-                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
-                log.info("Creating producer client for task {}", taskId);
-                taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
-            }
-
-            final RecordCollector recordCollector = new RecordCollectorImpl(
-                logContext,
-                taskId,
-                consumer,
-                threadProducer != null ?
-                    new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
-                    new StreamsProducer(taskProducers.get(taskId), true, 
logContext, applicationId),
-                config.defaultProductionExceptionHandler(),
-                
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
-                streamsMetrics
-            );
-
-            final Task task = new StreamTask(
+            createdTasks.add(createStreamTask(
                 taskId,
                 partitions,
-                topology,
                 consumer,
-                config,
-                streamsMetrics,
-                stateDirectory,
-                cache,
-                time,
+                logContext,
                 stateManager,
-                recordCollector
-            );
-
-            log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
-            createdTasks.add(task);
-            createTaskSensor.record();
+                topology));
         }
         return createdTasks;
     }
 
+    private StreamTask createStreamTask(final TaskId taskId,
+                                        final Set<TopicPartition> partitions,
+                                        final Consumer<byte[], byte[]> 
consumer,
+                                        final LogContext logContext,
+                                        final ProcessorStateManager 
stateManager,
+                                        final ProcessorTopology topology) {
+        if (threadProducer == null) {
+            final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
+            final Map<String, Object> producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
+            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
+            log.info("Creating producer client for task {}", taskId);
+            taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
+        }
+
+        final RecordCollector recordCollector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            consumer,
+            threadProducer != null ?
+                new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
+                                                                               
           new StreamsProducer(taskProducers.get(taskId), true, logContext, 
applicationId),
+            config.defaultProductionExceptionHandler(),
+            
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+            streamsMetrics
+        );
+
+        final StreamTask task = new StreamTask(
+            taskId,
+            partitions,
+            topology,
+            consumer,
+            config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            stateManager,
+            recordCollector
+        );
+
+        log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
+        createTaskSensor.record();

Review comment:
       I agree.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -235,9 +264,16 @@ void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
             return Collections.singleton(getThreadProducerClientId(threadId));
         } else {
             return taskProducers.keySet()
-                                .stream()
-                                .map(taskId -> 
getTaskProducerClientId(threadId, taskId))
-                                .collect(Collectors.toSet());
+                .stream()
+                .map(taskId -> getTaskProducerClientId(threadId, taskId))
+                .collect(Collectors.toSet());

Review comment:
       It seems like duelling formatters here and above. Do you want to propose 
that these it's better this way?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -155,9 +155,6 @@ void handleCorruption(final Map<TaskId, 
Collection<TopicPartition>> taskWithChan
             final TaskId taskId = entry.getKey();
             final Task task = tasks.get(taskId);
 
-            // this call is idempotent so even if the task is only CREATED we 
can still call it
-            changelogReader.remove(task.changelogPartitions());

Review comment:
       I lost track of how this is now handled. Can you enlighten me?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -70,18 +69,16 @@ static void registerStateStores(final Logger log,
                 e
             );
         }
+
         log.debug("Acquired state directory lock");
 
         final boolean storeDirsEmpty = 
stateDirectory.directoryForTaskIsEmpty(id);
 
         // We should only load checkpoint AFTER the corresponding state 
directory lock has been acquired and
         // the state stores have been registered; we should not try to load at 
the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574

Review comment:
       Seems like this comment has also become displaced from its intended 
location before L83, 
`stateMgr.initializeStoreOffsetsFromCheckpoint(storeDirsEmpty)`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -505,4 +550,35 @@ private StateStoreMetadata findStore(final TopicPartition 
changelogPartition) {
 
         return found.isEmpty() ? null : found.get(0);
     }
+
+    void prepareForRecycle() {
+        log.debug("Preparing to recycle state for {} task {}.", taskType, 
taskId);
+
+        if (recyclingState) {
+            throw new IllegalStateException("Attempted to re-recycle state 
without completing first recycle");
+        }
+        recyclingState = true;
+    }
+
+    void recycleState() {
+        log.debug("Completed recycling state for formerly {} task {}.", 
taskType, taskId);
+
+        if (!recyclingState) {
+            throw new IllegalStateException("Attempted to complete recycle but 
state is not currently being recycled");
+        }
+        recyclingState = false;
+
+        if (taskType == TaskType.ACTIVE) {
+            taskType = TaskType.STANDBY;
+        } else {
+            taskType = TaskType.ACTIVE;
+        }

Review comment:
       It seems a bit "bold" here to assume that we want to flip the 
active/standby state. The TaskManager objectively knows what type it wants the 
task to become, so it seems it should just inform us of the desired task type 
in `prepareForRecycle`.
   
   Or, better yet, just null it out and we can set it during 
`createStandbyTaskFromActive` and `createActiveTaskFromStandby`. This kind of 
thing might also be nice, since we could have a more explicit lifecycle for 
this object:
   * I'm ready (created, valid, good to go)
   * I'm getting recycled
   * I'm closed
   
   Then, we can ensure that these "createXFromY" methods cleanly take the state 
manager from "closed" to "ready".
   I'm not saying to add _another_ state enum, but having a clearly defined 
lifecycle will help us later in maintenance. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -247,6 +271,25 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
         }
     }
 
+    private void registerStoreWithChangelogReader(final String storeName) {
+        // if the store name does not exist in the changelog map, it means the 
underlying store
+        // is not log enabled (including global stores), and hence it does not 
need to be restored
+        if (isLoggingEnabled(storeName)) {
+            // NOTE we assume the partition of the topic can always be 
inferred from the task id;
+            // if user ever use a custom partition grouper (deprecated in 
KIP-528) this would break and
+            // it is not a regression (it would always break anyways)

Review comment:
       And this comment should move into `getStorePartition`

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -70,18 +69,16 @@ static void registerStateStores(final Logger log,
                 e
             );
         }
+
         log.debug("Acquired state directory lock");
 
         final boolean storeDirsEmpty = 
stateDirectory.directoryForTaskIsEmpty(id);
 
         // We should only load checkpoint AFTER the corresponding state 
directory lock has been acquired and
         // the state stores have been registered; we should not try to load at 
the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574
-        for (final StateStore store : topology.stateStores()) {
-            processorContext.uninitialize();
-            store.init(processorContext, store);
-            log.trace("Registered state store {}", store.name());
-        }
+        stateMgr.registerStateStores(topology.stateStores(), processorContext);

Review comment:
       It seems like the intent here is to skip initializing the store again if 
it's already been initialized, but still register it to the changelog reader. 
Which is ok, since we `unregisterAllStoresWithChangelogReader` in the 
recycleState, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to