ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r391363596



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##########
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final 
String threadClientId, final
                 partitions
             );
 
-            if (threadProducer == null) {
-                final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
-                final Map<String, Object> producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
-                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
-                log.info("Creating producer client for task {}", taskId);
-                taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
-            }
-
-            final RecordCollector recordCollector = new RecordCollectorImpl(
-                logContext,
-                taskId,
-                consumer,
-                threadProducer != null ?
-                    new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
-                    new StreamsProducer(taskProducers.get(taskId), true, 
logContext, applicationId),
-                config.defaultProductionExceptionHandler(),
-                
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
-                streamsMetrics
-            );
-
-            final Task task = new StreamTask(
+            createdTasks.add(createStreamTask(
                 taskId,
                 partitions,
-                topology,
                 consumer,
-                config,
-                streamsMetrics,
-                stateDirectory,
-                cache,
-                time,
+                logContext,
                 stateManager,
-                recordCollector
-            );
-
-            log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
-            createdTasks.add(task);
-            createTaskSensor.record();
+                topology));
         }
         return createdTasks;
     }
 
+    private StreamTask createStreamTask(final TaskId taskId,
+                                        final Set<TopicPartition> partitions,
+                                        final Consumer<byte[], byte[]> 
consumer,
+                                        final LogContext logContext,
+                                        final ProcessorStateManager 
stateManager,
+                                        final ProcessorTopology topology) {
+        if (threadProducer == null) {
+            final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
+            final Map<String, Object> producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
+            producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
+            log.info("Creating producer client for task {}", taskId);
+            taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
+        }
+
+        final RecordCollector recordCollector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            consumer,
+            threadProducer != null ?
+                new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
+                                                                               
           new StreamsProducer(taskProducers.get(taskId), true, logContext, 
applicationId),
+            config.defaultProductionExceptionHandler(),
+            
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+            streamsMetrics
+        );
+
+        final StreamTask task = new StreamTask(
+            taskId,
+            partitions,
+            topology,
+            consumer,
+            config,
+            streamsMetrics,
+            stateDirectory,
+            cache,
+            time,
+            stateManager,
+            recordCollector
+        );
+
+        log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
+        createTaskSensor.record();
+        return task;
+    }
+
+    StreamTask convertStandbyToActive(final StandbyTask standbyTask,
+                                      final Set<TopicPartition> partitions,
+                                      final Consumer<byte[], byte[]> consumer) 
{
+        return createStreamTask(
+            standbyTask.id,
+            partitions,
+            consumer,
+            getLogContext(standbyTask.id),
+            standbyTask.stateMgr,
+            standbyTask.topology);

Review comment:
       The `topology` is created but never initialized for a standby, therefore 
we don't need to worry about closing it and can reuse it here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to