KAFKA-5754; Refactor Streams to use LogContext This PR utilizes `org.apache.kafka.common.utils.LogContext` for logging in `KafkaStreams`. hachikuji, ijuma please review this and let me know your thoughts.
Author: umesh chaudhary <umesh9...@gmail.com> Reviewers: Guozhang Wang <wangg...@gmail.com>, Damian Guy <damian....@gmail.com> Closes #3727 from umesh9794/KAFKA-5754 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f305dd68 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f305dd68 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f305dd68 Branch: refs/heads/trunk Commit: f305dd68f6524abc25c4ed88983f0e78b4e6c243 Parents: 6055c74 Author: umesh chaudhary <umesh9...@gmail.com> Authored: Mon Sep 18 09:53:27 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Mon Sep 18 09:53:27 2017 +0100 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 43 +++--- .../processor/internals/AbstractTask.java | 31 ++-- .../processor/internals/AssignedTasks.java | 66 ++++----- .../processor/internals/GlobalStreamThread.java | 37 ++--- .../internals/ProcessorStateManager.java | 46 +++--- .../internals/RecordCollectorImpl.java | 29 ++-- .../processor/internals/StandbyContextImpl.java | 3 +- .../processor/internals/StandbyTask.java | 13 +- .../internals/StoreChangelogReader.java | 32 ++-- .../internals/StreamPartitionAssignor.java | 46 +++--- .../streams/processor/internals/StreamTask.java | 51 +++---- .../processor/internals/StreamThread.java | 145 +++++++++---------- .../processor/internals/TaskManager.java | 40 ++--- .../streams/state/internals/ThreadCache.java | 15 +- .../apache/kafka/streams/KafkaStreamsTest.java | 1 + ...reamSessionWindowAggregateProcessorTest.java | 3 +- .../internals/AbstractProcessorContextTest.java | 3 +- .../processor/internals/AbstractTaskTest.java | 3 +- .../processor/internals/AssignedTasksTest.java | 3 +- .../processor/internals/ProcessorNodeTest.java | 3 +- .../internals/ProcessorStateManagerTest.java | 51 +++++-- .../internals/RecordCollectorTest.java | 25 +++- .../processor/internals/RecordQueueTest.java | 3 +- .../processor/internals/SinkNodeTest.java | 3 +- .../processor/internals/StandbyTaskTest.java | 3 +- .../processor/internals/StateConsumerTest.java | 6 +- .../internals/StoreChangelogReaderTest.java | 6 +- .../processor/internals/StreamTaskTest.java | 11 +- .../streams/state/KeyValueStoreTestDriver.java | 5 +- .../internals/CachingKeyValueStoreTest.java | 3 +- .../internals/CachingSessionStoreTest.java | 3 +- .../state/internals/CachingWindowStoreTest.java | 3 +- .../ChangeLoggingKeyValueBytesStoreTest.java | 3 +- .../ChangeLoggingKeyValueStoreTest.java | 3 +- ...rtedCacheKeyValueBytesStoreIteratorTest.java | 5 +- ...rtedCacheWrappedWindowStoreIteratorTest.java | 3 +- .../state/internals/MeteredWindowStoreTest.java | 3 +- .../RocksDBKeyValueStoreSupplierTest.java | 3 +- .../RocksDBSegmentedBytesStoreTest.java | 3 +- .../RocksDBSessionStoreSupplierTest.java | 3 +- .../internals/RocksDBSessionStoreTest.java | 3 +- .../state/internals/RocksDBStoreTest.java | 5 +- .../RocksDBWindowStoreSupplierTest.java | 3 +- .../state/internals/RocksDBWindowStoreTest.java | 5 +- .../state/internals/SegmentIteratorTest.java | 3 +- .../streams/state/internals/SegmentsTest.java | 3 +- .../state/internals/StoreChangeLoggerTest.java | 3 +- .../StreamThreadStateStoreProviderTest.java | 3 +- .../state/internals/ThreadCacheTest.java | 54 +++---- .../apache/kafka/test/KStreamTestDriver.java | 8 +- .../kafka/test/ProcessorTopologyTestDriver.java | 6 +- 51 files changed, 466 insertions(+), 391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 7698f39..b31a3e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.StreamsException; @@ -57,7 +58,6 @@ import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.StateStoreProvider; import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -124,7 +124,6 @@ import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG @InterfaceStability.Evolving public class KafkaStreams { - private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class); private static final String JMX_PREFIX = "kafka.streams"; private static final int DEFAULT_CLOSE_TIMEOUT = 0; @@ -132,10 +131,10 @@ public class KafkaStreams { // in userData of the subscription request to allow assignor be aware // of the co-location of stream thread's consumers. It is for internal // usage only and should not be exposed to users at all. + private final Logger log; + private final String logPrefix; private final UUID processId; - private final Metrics metrics; - private final String logPrefix; private final StreamsConfig config; private final StreamThread[] threads; private final StateDirectory stateDirectory; @@ -230,7 +229,7 @@ public class KafkaStreams { // it is ok: just move on to the next iteration } } else { - log.debug("{} Cannot transit to {} within {}ms", logPrefix, targetState, waitMs); + log.debug("Cannot transit to {} within {}ms", targetState, waitMs); return false; } elapsedMs = System.currentTimeMillis() - begin; @@ -256,10 +255,10 @@ public class KafkaStreams { // will be refused but we do not throw exception here, to allow idempotent close calls return false; } else if (!state.isValidTransition(newState)) { - log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState); - throw new IllegalStateException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState); + log.error("Unexpected state transition from {} to {}", oldState, newState); + throw new IllegalStateException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { - log.info("{} State transition from {} to {}", logPrefix, oldState, newState); + log.info("State transition from {} to {}", oldState, newState); } state = newState; stateLock.notifyAll(); @@ -406,7 +405,7 @@ public class KafkaStreams { } if (setState(State.ERROR)) { - log.warn("{} All stream threads have died. The instance will be in error state and should be closed.", logPrefix); + log.warn("All stream threads have died. The instance will be in error state and should be closed."); } } @@ -453,7 +452,7 @@ public class KafkaStreams { // special case when global thread is dead if (newState == GlobalStreamThread.State.DEAD && state != State.ERROR && setState(State.ERROR)) { - log.warn("{} Global thread has died. The instance will be in error state and should be closed.", logPrefix); + log.warn("Global thread has died. The instance will be in error state and should be closed."); } } } @@ -542,7 +541,11 @@ public class KafkaStreams { if (clientId.length() <= 0) clientId = applicationId + "-" + processId; - this.logPrefix = String.format("stream-client [%s]", clientId); + this.logPrefix = String.format("stream-client [%s] ", clientId); + + final LogContext logContext = new LogContext(logPrefix); + + this.log = logContext.logger(getClass()); final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); @@ -565,7 +568,7 @@ public class KafkaStreams { final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) < 0) { - log.warn("{} Negative cache size passed in. Reverting to cache size of 0 bytes.", logPrefix); + log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes."); } final long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / @@ -673,7 +676,7 @@ public class KafkaStreams { try { client.close(); } catch (final IOException e) { - log.warn("{} Could not close StreamKafkaClient.", logPrefix, e); + log.warn("Could not close StreamKafkaClient.", e); } } @@ -690,7 +693,7 @@ public class KafkaStreams { * @throws StreamsException if the Kafka brokers have version 0.10.0.x */ public synchronized void start() throws IllegalStateException, StreamsException { - log.debug("{} Starting Streams client", logPrefix); + log.debug("Starting Streams client"); // first set state to RUNNING before kicking off the threads, // making sure the state will always transit to RUNNING before REBALANCING @@ -715,12 +718,12 @@ public class KafkaStreams { } }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS); - log.info("{} Started Streams client", logPrefix); + log.info("Started Streams client"); } else { // if transition failed but no exception is thrown; currently it is not possible // since we do not allow calling start multiple times whether or not it is already shutdown. // TODO: In the future if we lift this restriction this code path could then be triggered and be updated - log.error("{} Already stopped, cannot re-start", logPrefix); + log.error("Already stopped, cannot re-start"); } } @@ -744,12 +747,12 @@ public class KafkaStreams { * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}. */ public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("{} Stopping Streams client with timeoutMillis = {} ms.", logPrefix, timeUnit.toMillis(timeout)); + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped - log.info("{} Already in the pending shutdown state, wait to complete shutdown", logPrefix); + log.info("Already in the pending shutdown state, wait to complete shutdown"); } else { stateDirCleaner.shutdownNow(); @@ -798,10 +801,10 @@ public class KafkaStreams { } if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) { - log.info("{} Streams client stopped completely", logPrefix); + log.info("Streams client stopped completely"); return true; } else { - log.info("{} Streams client cannot stop completely within the timeout", logPrefix); + log.info("Streams client cannot stop completely within the timeout"); return false; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index b4c8c16..5ed9aae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -30,7 +31,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -40,7 +40,6 @@ import java.util.Map; import java.util.Set; public abstract class AbstractTask implements Task { - private static final Logger log = LoggerFactory.getLogger(AbstractTask.class); final TaskId id; final String applicationId; @@ -50,6 +49,8 @@ public abstract class AbstractTask implements Task { final Consumer consumer; final String logPrefix; final boolean eosEnabled; + final Logger log; + final LogContext logContext; boolean taskInitialized; private final StateDirectory stateDirectory; @@ -75,7 +76,9 @@ public abstract class AbstractTask implements Task { this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)); this.stateDirectory = stateDirectory; - logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id()); + this.logPrefix = String.format("%s [%s] ", isStandby ? "standby-task" : "task", id()); + this.logContext = new LogContext(logPrefix); + this.log = logContext.logger(getClass()); // create the processor state manager try { @@ -86,9 +89,10 @@ public abstract class AbstractTask implements Task { stateDirectory, topology.storeToChangelogTopic(), changelogReader, - eosEnabled); + eosEnabled, + logContext); } catch (final IOException e) { - throw new ProcessorStateException(String.format("%s Error while creating the state manager", logPrefix), e); + throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e); } } @@ -173,7 +177,7 @@ public abstract class AbstractTask implements Task { stateMgr.putOffsetLimit(partition, offset); if (log.isTraceEnabled()) { - log.trace("{} Updating store offset limits {} for changelog {}", logPrefix, offset, partition); + log.trace("Updating store offset limits {} for changelog {}", offset, partition); } } catch (final AuthorizationException e) { throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e); @@ -199,21 +203,20 @@ public abstract class AbstractTask implements Task { try { if (!stateDirectory.lock(id, 5)) { - throw new LockException(String.format("%s Failed to lock the state directory for task %s", + throw new LockException(String.format("%sFailed to lock the state directory for task %s", logPrefix, id)); } } catch (IOException e) { - throw new StreamsException(String.format("%s fatal error while trying to lock the state directory for task %s", - logPrefix, - id)); + throw new StreamsException(String.format("%sFatal error while trying to lock the state directory for task %s", + logPrefix, id)); } - log.trace("{} Initializing state stores", logPrefix); + log.trace("Initializing state stores"); // set initial offset limits updateOffsetLimits(); for (final StateStore store : topology.stateStores()) { - log.trace("{} Initializing store {}", logPrefix, store.name()); + log.trace("Initializing store {}", store.name()); store.init(processorContext, store); } } @@ -225,7 +228,7 @@ public abstract class AbstractTask implements Task { */ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException { ProcessorStateException exception = null; - log.trace("{} Closing state manager", logPrefix); + log.trace("Closing state manager"); try { stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); } catch (final ProcessorStateException e) { @@ -235,7 +238,7 @@ public abstract class AbstractTask implements Task { stateDirectory.unlock(id); } catch (IOException e) { if (exception == null) { - exception = new ProcessorStateException(String.format("%s Failed to release state dir lock", logPrefix), e); + exception = new ProcessorStateException(String.format("%sFailed to release state dir lock", logPrefix), e); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index a1966b1..2d886b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -20,10 +20,10 @@ import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -38,8 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; class AssignedTasks { - private static final Logger log = LoggerFactory.getLogger(AssignedTasks.class); - private final String logPrefix; + private final Logger log; private final String taskTypeName; private final TaskAction maybeCommitAction; private final TaskAction commitAction; @@ -54,11 +53,12 @@ class AssignedTasks { private int committed = 0; - AssignedTasks(final String logPrefix, + AssignedTasks(final LogContext logContext, final String taskTypeName) { - this.logPrefix = logPrefix; this.taskTypeName = taskTypeName; + this.log = logContext.logger(getClass()); + maybeCommitAction = new TaskAction() { @Override public String name() { @@ -71,8 +71,7 @@ class AssignedTasks { committed++; task.commit(); if (log.isDebugEnabled()) { - log.debug("{} Committed active task {} per user request in", - logPrefix, task.id()); + log.debug("Committed active task {} per user request in", task.id()); } } } @@ -110,13 +109,13 @@ class AssignedTasks { void initializeNewTasks() { if (!created.isEmpty()) { - log.trace("{} Initializing {}s {}", logPrefix, taskTypeName, created.keySet()); + log.trace("Initializing {}s {}", taskTypeName, created.keySet()); } for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator(); it.hasNext(); ) { final Map.Entry<TaskId, Task> entry = it.next(); try { if (!entry.getValue().initialize()) { - log.debug("{} transitioning {} {} to restoring", logPrefix, taskTypeName, entry.getKey()); + log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey()); restoring.put(entry.getKey(), entry.getValue()); } else { transitionToRunning(entry.getValue()); @@ -124,7 +123,7 @@ class AssignedTasks { it.remove(); } catch (final LockException e) { // made this trace as it will spam the logs in the poll loop. - log.trace("{} Could not create {} {} due to {}; will retry in the next run loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage()); + log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage()); } } } @@ -133,7 +132,7 @@ class AssignedTasks { if (restored.isEmpty()) { return Collections.emptySet(); } - log.trace("{} {} partitions restored for {}", logPrefix, taskTypeName, restored); + log.trace("{} partitions restored for {}", taskTypeName, restored); final Set<TopicPartition> resume = new HashSet<>(); restoredPartitions.addAll(restored); for (final Iterator<Map.Entry<TaskId, Task>> it = restoring.entrySet().iterator(); it.hasNext(); ) { @@ -147,8 +146,7 @@ class AssignedTasks { if (log.isTraceEnabled()) { final HashSet<TopicPartition> outstandingPartitions = new HashSet<>(task.changelogPartitions()); outstandingPartitions.removeAll(restoredPartitions); - log.trace("{} partition restoration not complete for {} {} partitions: {}", - logPrefix, + log.trace("partition restoration not complete for {} {} partitions: {}", taskTypeName, task.id(), task.changelogPartitions()); @@ -173,11 +171,11 @@ class AssignedTasks { RuntimeException suspend() { final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); - log.trace("{} Suspending running {} {}", logPrefix, taskTypeName, runningTaskIds()); + log.trace("Suspending running {} {}", taskTypeName, runningTaskIds()); firstException.compareAndSet(null, suspendTasks(running.values())); - log.trace("{} Close restoring {} {}", logPrefix, taskTypeName, restoring.keySet()); + log.trace("Close restoring {} {}", taskTypeName, restoring.keySet()); firstException.compareAndSet(null, closeNonRunningTasks(restoring.values())); - log.trace("{} Close created {} {}", logPrefix, taskTypeName, created.keySet()); + log.trace("Close created {} {}", taskTypeName, created.keySet()); firstException.compareAndSet(null, closeNonRunningTasks(created.values())); previousActiveTasks.clear(); previousActiveTasks.addAll(running.keySet()); @@ -194,7 +192,7 @@ class AssignedTasks { try { task.close(false, false); } catch (final RuntimeException e) { - log.error("{} Failed to close {}, {}", logPrefix, taskTypeName, task.id(), e); + log.error("Failed to close {}, {}", taskTypeName, task.id(), e); if (exception == null) { exception = e; } @@ -213,16 +211,16 @@ class AssignedTasks { } catch (final CommitFailedException e) { suspended.put(task.id(), task); // commit failed during suspension. Just log it. - log.warn("{} Failed to commit {} {} state when suspending due to CommitFailedException", logPrefix, taskTypeName, task.id()); + log.warn("Failed to commit {} {} state when suspending due to CommitFailedException", taskTypeName, task.id()); } catch (final ProducerFencedException e) { closeZombieTask(task); it.remove(); } catch (final RuntimeException e) { - log.error("{} Suspending {} {} failed due to the following error:", logPrefix, taskTypeName, task.id(), e); + log.error("Suspending {} {} failed due to the following error:", taskTypeName, task.id(), e); try { task.close(false, false); } catch (final Exception f) { - log.error("{} After suspending failed, closing the same {} {} failed again due to the following error:", logPrefix, taskTypeName, task.id(), f); + log.error("After suspending failed, closing the same {} {} failed again due to the following error:", taskTypeName, task.id(), f); } if (exception == null) { exception = e; @@ -233,11 +231,11 @@ class AssignedTasks { } private void closeZombieTask(final Task task) { - log.warn("{} Producer of task {} fenced; closing zombie task", logPrefix, task.id()); + log.warn("Producer of task {} fenced; closing zombie task", task.id()); try { task.close(false, true); } catch (final Exception e) { - log.warn("{} Failed to close zombie {} due to {}, ignore and proceed", taskTypeName, logPrefix, e); + log.warn("{} Failed to close zombie due to {}, ignore and proceed", taskTypeName, e); } } @@ -248,22 +246,22 @@ class AssignedTasks { boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> partitions) { if (suspended.containsKey(taskId)) { final Task task = suspended.get(taskId); - log.trace("{} found suspended {} {}", logPrefix, taskTypeName, taskId); + log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); task.resume(); transitionToRunning(task); - log.trace("{} resuming suspended {} {}", logPrefix, taskTypeName, task.id()); + log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; } else { - log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", logPrefix, taskId, partitions, task.partitions()); + log.trace("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); } } return false; } private void transitionToRunning(final Task task) { - log.debug("{} transitioning {} {} to running", logPrefix, taskTypeName, task.id()); + log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); for (TopicPartition topicPartition : task.partitions()) { runningByPartition.put(topicPartition, task); @@ -357,7 +355,7 @@ class AssignedTasks { processed++; } } catch (RuntimeException e) { - log.error("{} Failed to process {} {} due to the following error:", logPrefix, taskTypeName, task.id(), e); + log.error("Failed to process {} {} due to the following error:", taskTypeName, task.id(), e); throw e; } } @@ -375,7 +373,7 @@ class AssignedTasks { punctuated++; } } catch (KafkaException e) { - log.error("{} Failed to punctuate {} {} due to the following error:", logPrefix, taskTypeName, task.id(), e); + log.error("Failed to punctuate {} {} due to the following error:", taskTypeName, task.id(), e); throw e; } } @@ -391,13 +389,12 @@ class AssignedTasks { action.apply(task); } catch (final CommitFailedException e) { // commit failed. This is already logged inside the task as WARN and we can just log it again here. - log.warn("{} Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", logPrefix, taskTypeName, task.id(), action.name()); + log.warn("Failed to commit {} {} during {} state due to CommitFailedException; this task may be no longer owned by the thread", taskTypeName, task.id(), action.name()); } catch (final ProducerFencedException e) { closeZombieTask(task); it.remove(); } catch (final RuntimeException t) { - log.error("{} Failed to {} {} {} due to the following error:", - logPrefix, + log.error("Failed to {} {} {} due to the following error:", action.name(), taskTypeName, task.id(), @@ -418,11 +415,11 @@ class AssignedTasks { while (standByTaskIterator.hasNext()) { final Task suspendedTask = standByTaskIterator.next(); if (!newAssignment.containsKey(suspendedTask.id()) || !suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) { - log.debug("{} Closing suspended and not re-assigned {} {}", logPrefix, taskTypeName, suspendedTask.id()); + log.debug("Closing suspended and not re-assigned {} {}", taskTypeName, suspendedTask.id()); try { suspendedTask.closeSuspended(true, false, null); } catch (final Exception e) { - log.error("{} Failed to remove suspended {} {} due to the following error:", logPrefix, taskTypeName, suspendedTask.id(), e); + log.error("Failed to remove suspended {} {} due to the following error:", taskTypeName, suspendedTask.id(), e); } finally { standByTaskIterator.remove(); } @@ -441,8 +438,7 @@ class AssignedTasks { try { task.close(clean, false); } catch (final Throwable t) { - log.error("{} Failed while closing {} {} due to the following error:", - logPrefix, + log.error("Failed while closing {} {} due to the following error:", task.getClass().getSimpleName(), task.id(), t); http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 32e8330..41ebcca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -28,7 +29,6 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -46,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.St */ public class GlobalStreamThread extends Thread { - private static final Logger log = LoggerFactory.getLogger(GlobalStreamThread.class); - + private final Logger log; + private final LogContext logContext; private final StreamsConfig config; private final Consumer<byte[], byte[]> consumer; private final StateDirectory stateDirectory; @@ -147,10 +147,10 @@ public class GlobalStreamThread extends Thread { // will be refused but we do not throw exception here return false; } else if (!state.isValidTransition(newState)) { - log.error("{} Unexpected state transition from {} to {}", logPrefix, oldState, newState); - throw new StreamsException(logPrefix + " Unexpected state transition from " + oldState + " to " + newState); + log.error("Unexpected state transition from {} to {}", oldState, newState); + throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { - log.info("{} State transition from {} to {}", logPrefix, oldState, newState); + log.info("State transition from {} to {}", oldState, newState); } state = newState; @@ -185,8 +185,11 @@ public class GlobalStreamThread extends Thread { long cacheSizeBytes = Math.max(0, config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) / (config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG) + 1)); this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId)); - this.cache = new ThreadCache(threadClientId, cacheSizeBytes, streamsMetrics); - this.logPrefix = String.format("global-stream-thread [%s]", threadClientId); + this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId); + this.logContext = new LogContext(logPrefix); + this.log = logContext.logger(getClass()); + this.cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); + } static class StateConsumer { @@ -195,17 +198,17 @@ public class GlobalStreamThread extends Thread { private final Time time; private final long pollMs; private final long flushInterval; - private final String logPrefix; + private final Logger log; private long lastFlush; - StateConsumer(final String logPrefix, + StateConsumer(final LogContext logContext, final Consumer<byte[], byte[]> consumer, final GlobalStateMaintainer stateMaintainer, final Time time, final long pollMs, final long flushInterval) { - this.logPrefix = logPrefix; + this.log = logContext.logger(getClass()); this.consumer = consumer; this.stateMaintainer = stateMaintainer; this.time = time; @@ -240,7 +243,7 @@ public class GlobalStreamThread extends Thread { } catch (Exception e) { // just log an error if the consumer throws an exception during close // so we can always attempt to close the state stores. - log.error("{} Failed to close consumer due to the following error:", logPrefix, e); + log.error("Failed to close consumer due to the following error:", e); } stateMaintainer.close(); @@ -260,7 +263,7 @@ public class GlobalStreamThread extends Thread { setState(State.PENDING_SHUTDOWN); setState(State.DEAD); - log.warn("{} Error happened during initialization of the global state store; this thread has shutdown", logPrefix); + log.warn("Error happened during initialization of the global state store; this thread has shutdown"); return; } @@ -276,16 +279,16 @@ public class GlobalStreamThread extends Thread { // intentionally do not check the returned flag setState(State.PENDING_SHUTDOWN); - log.info("{} Shutting down", logPrefix); + log.info("Shutting down"); try { stateConsumer.close(); } catch (IOException e) { - log.error("{} Failed to close state maintainer due to the following error:", logPrefix, e); + log.error("Failed to close state maintainer due to the following error:", e); } setState(DEAD); - log.info("{} Shutdown complete", logPrefix); + log.info("Shutdown complete"); } } @@ -293,7 +296,7 @@ public class GlobalStreamThread extends Thread { try { final GlobalStateManager stateMgr = new GlobalStateManagerImpl(topology, consumer, stateDirectory); final StateConsumer stateConsumer - = new StateConsumer(logPrefix, + = new StateConsumer(this.logContext, consumer, new GlobalStateUpdateTask(topology, new GlobalProcessorContextImpl( http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index acd7674..942e41a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -28,7 +29,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -42,11 +42,11 @@ import java.util.Map; public class ProcessorStateManager implements StateManager { - private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class); public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + private final Logger log; private final File baseDir; private final TaskId taskId; private final String logPrefix; @@ -77,10 +77,12 @@ public class ProcessorStateManager implements StateManager { final StateDirectory stateDirectory, final Map<String, String> storeToChangelogTopic, final ChangelogReader changelogReader, - final boolean eosEnabled) throws IOException { + final boolean eosEnabled, + final LogContext logContext) throws IOException { this.taskId = taskId; this.changelogReader = changelogReader; - logPrefix = String.format("task [%s]", taskId); + logPrefix = String.format("task [%s] ", taskId); + this.log = logContext.logger(getClass()); partitionForTopic = new HashMap<>(); for (final TopicPartition source : sources) { @@ -100,7 +102,7 @@ public class ProcessorStateManager implements StateManager { try { baseDir = stateDirectory.directoryForTask(taskId); } catch (final ProcessorStateException e) { - throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", + throw new LockException(String.format("%sFailed to get the directory for task %s. Exception %s", logPrefix, taskId, e)); } @@ -114,7 +116,7 @@ public class ProcessorStateManager implements StateManager { checkpoint = null; } - log.debug("{} Created state store manager for task {} with the acquired state dir lock", logPrefix, taskId); + log.debug("Created state store manager for task {} with the acquired state dir lock", taskId); } @@ -139,14 +141,14 @@ public class ProcessorStateManager implements StateManager { public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) { - log.debug("{} Registering state store {} to its state manager", logPrefix, store.name()); + log.debug("Registering state store {} to its state manager", store.name()); if (store.name().equals(CHECKPOINT_FILE_NAME)) { - throw new IllegalArgumentException(String.format("%s Illegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME)); + throw new IllegalArgumentException(String.format("%sIllegal store name: %s", logPrefix, CHECKPOINT_FILE_NAME)); } if (stores.containsKey(store.name())) { - throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name())); + throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", logPrefix, store.name())); } // check that the underlying change log topic exist or not @@ -160,12 +162,12 @@ public class ProcessorStateManager implements StateManager { if (isStandby) { if (store.persistent()) { - log.trace("{} Preparing standby replica of persistent state store {} with changelog topic {}", logPrefix, store.name(), topic); + log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", store.name(), topic); restoreCallbacks.put(topic, stateRestoreCallback); } } else { - log.trace("{} Restoring state store {} from changelog topic {}", logPrefix, store.name(), topic); + log.trace("Restoring state store {} from changelog topic {}", store.name(), topic); final StateRestorer restorer = new StateRestorer(storePartition, new CompositeRestoreListener(stateRestoreCallback), checkpointedOffsets.get(storePartition), @@ -227,7 +229,7 @@ public class ProcessorStateManager implements StateManager { try { restoreCallback.restoreAll(restoreRecords); } catch (final Exception e) { - throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", logPrefix, storePartition), e); + throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", logPrefix, storePartition), e); } } @@ -238,7 +240,7 @@ public class ProcessorStateManager implements StateManager { } void putOffsetLimit(final TopicPartition partition, final long limit) { - log.trace("{} Updating store offset limit for partition {} to {}", logPrefix, partition, limit); + log.trace("Updating store offset limit for partition {} to {}", partition, limit); offsetLimits.put(partition, limit); } @@ -255,13 +257,13 @@ public class ProcessorStateManager implements StateManager { @Override public void flush() { if (!stores.isEmpty()) { - log.debug("{} Flushing all stores registered in the state manager", logPrefix); + log.debug("Flushing all stores registered in the state manager"); for (final StateStore store : stores.values()) { try { - log.trace("{} Flushing store={}", logPrefix, store.name()); + log.trace("Flushing store={}", store.name()); store.flush(); } catch (final Exception e) { - throw new ProcessorStateException(String.format("%s Failed to flush state store %s", logPrefix, store.name()), e); + throw new ProcessorStateException(String.format("%sFailed to flush state store %s", logPrefix, store.name()), e); } } } @@ -278,16 +280,16 @@ public class ProcessorStateManager implements StateManager { // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet if (!stores.isEmpty()) { - log.debug("{} Closing its state manager and all the registered state stores", logPrefix); + log.debug("Closing its state manager and all the registered state stores"); for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { - log.debug("{} Closing storage engine {}", logPrefix, entry.getKey()); + log.debug("Closing storage engine {}", entry.getKey()); try { entry.getValue().close(); } catch (final Exception e) { if (firstException == null) { - firstException = new ProcessorStateException(String.format("%s Failed to close state store %s", logPrefix, entry.getKey()), e); + firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", logPrefix, entry.getKey()), e); } - log.error("{} Failed to close state store {}: ", logPrefix, entry.getKey(), e); + log.error("Failed to close state store {}: ", entry.getKey(), e); } } @@ -305,7 +307,7 @@ public class ProcessorStateManager implements StateManager { // write the checkpoint @Override public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { - log.trace("{} Writing checkpoint: {}", logPrefix, ackedOffsets); + log.trace("Writing checkpoint: {}", ackedOffsets); checkpointedOffsets.putAll(changelogReader.restoredOffsets()); for (final Map.Entry<String, StateStore> entry : stores.entrySet()) { final String storeName = entry.getKey(); @@ -339,7 +341,7 @@ public class ProcessorStateManager implements StateManager { } void registerGlobalStateStores(final List<StateStore> stateStores) { - log.debug("{} Register global stores {}", logPrefix, stateStores); + log.debug("Register global stores {}", stateStores); for (final StateStore stateStore : stateStores) { globalStores.put(stateStore.name(), stateStore); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 79e3350..4eec2d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -26,11 +26,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StreamPartitioner; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -40,18 +40,19 @@ public class RecordCollectorImpl implements RecordCollector { private static final int MAX_SEND_ATTEMPTS = 3; private static final long SEND_RETRY_BACKOFF = 100L; - private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class); - + + private final Logger log; private final Producer<byte[], byte[]> producer; private final Map<TopicPartition, Long> offsets; private final String logPrefix; private volatile KafkaException sendException; - public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId) { + public RecordCollectorImpl(final Producer<byte[], byte[]> producer, final String streamTaskId, final LogContext logContext) { this.producer = producer; - offsets = new HashMap<>(); - logPrefix = String.format("task [%s]", streamTaskId); + this.offsets = new HashMap<>(); + this.logPrefix = String.format("task [%s] ", streamTaskId); + this.log = logContext.logger(getClass()); } @Override @@ -107,14 +108,14 @@ public class RecordCollectorImpl implements RecordCollector { offsets.put(tp, metadata.offset()); } else { if (sendException == null) { - log.error("{} Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " + + log.error("Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " + "No more records will be sent and no more offsets will be recorded for this task.", - logPrefix, key, value, timestamp, topic, exception); + key, value, timestamp, topic, exception); if (exception instanceof ProducerFencedException) { - sendException = new ProducerFencedException(String.format("%s Abort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s", + sendException = new ProducerFencedException(String.format("%sAbort sending since producer got fenced with a previous record (key %s value %s timestamp %d) to topic %s, error message: %s", logPrefix, key, value, timestamp, topic, exception.getMessage())); } else { - sendException = new StreamsException(String.format("%s Abort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.", + sendException = new StreamsException(String.format("%sAbort sending since an error caught with a previous record (key %s value %s timestamp %d) to topic %s due to %s.", logPrefix, key, value, timestamp, topic, exception), exception); } } @@ -124,9 +125,9 @@ public class RecordCollectorImpl implements RecordCollector { return; } catch (final TimeoutException e) { if (attempt == MAX_SEND_ATTEMPTS) { - throw new StreamsException(String.format("%s Failed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt)); + throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout after %d attempts", logPrefix, topic, attempt)); } - log.warn("{} Timeout exception caught when sending record to topic {}; retrying with {} attempt", logPrefix, topic, attempt); + log.warn("Timeout exception caught when sending record to topic {}; retrying with {} attempt", topic, attempt); Utils.sleep(SEND_RETRY_BACKOFF); } } @@ -140,14 +141,14 @@ public class RecordCollectorImpl implements RecordCollector { @Override public void flush() { - log.debug("{} Flushing producer", logPrefix); + log.debug("Flushing producer"); producer.flush(); checkForException(); } @Override public void close() { - log.debug("{} Closing producer", logPrefix); + log.debug("Closing producer"); producer.close(); checkForException(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 812a4ab..a9a03ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.Cancellable; @@ -70,7 +71,7 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle final StreamsConfig config, final ProcessorStateManager stateMgr, final StreamsMetrics metrics) { - super(id, applicationId, config, metrics, stateMgr, new ThreadCache("zeroCache", 0, metrics)); + super(id, applicationId, config, metrics, stateMgr, new ThreadCache(new LogContext("zeroCache "), 0, metrics)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 75151a8..033af24 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -22,8 +22,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -36,7 +34,6 @@ import java.util.Map; */ public class StandbyTask extends AbstractTask { - private static final Logger log = LoggerFactory.getLogger(StandbyTask.class); private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>(); /** @@ -73,7 +70,7 @@ public class StandbyTask extends AbstractTask { */ @Override public void resume() { - log.debug("{} Resuming", logPrefix); + log.debug("Resuming"); updateOffsetLimits(); } @@ -86,7 +83,7 @@ public class StandbyTask extends AbstractTask { */ @Override public void commit() { - log.trace("{} Committing", logPrefix); + log.trace("Committing"); flushAndCheckpointState(); // reinitialize offset limits updateOffsetLimits(); @@ -100,7 +97,7 @@ public class StandbyTask extends AbstractTask { */ @Override public void suspend() { - log.debug("{} Suspending", logPrefix); + log.debug("Suspending"); flushAndCheckpointState(); } @@ -124,7 +121,7 @@ public class StandbyTask extends AbstractTask { if (!taskInitialized) { return; } - log.debug("{} Closing", logPrefix); + log.debug("Closing"); boolean committedSuccessfully = false; try { commit(); @@ -163,7 +160,7 @@ public class StandbyTask extends AbstractTask { */ public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partition, final List<ConsumerRecord<byte[], byte[]>> records) { - log.trace("{} Updating standby replicas of its state store for partition [{}]", logPrefix, partition); + log.trace("Updating standby replicas of its state store for partition [{}]", partition); return stateMgr.updateStandbyStates(partition, records); } http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 57dff64..8ecc7e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -22,10 +22,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateRestoreListener; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -38,9 +38,8 @@ import java.util.Map; import java.util.Set; public class StoreChangelogReader implements ChangelogReader { - private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class); - private final String logPrefix; + private final Logger log; private final Consumer<byte[], byte[]> consumer; private final StateRestoreListener stateRestoreListener; private final Map<TopicPartition, Long> endOffsets = new HashMap<>(); @@ -51,16 +50,17 @@ public class StoreChangelogReader implements ChangelogReader { public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]> consumer, - final StateRestoreListener stateRestoreListener) { + final StateRestoreListener stateRestoreListener, + final LogContext logContext) { this.consumer = consumer; - - this.logPrefix = String.format("stream-thread [%s]", threadId); + this.log = logContext.logger(getClass()); this.stateRestoreListener = stateRestoreListener; } public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, - final StateRestoreListener stateRestoreListener) { - this("", consumer, stateRestoreListener); + final StateRestoreListener stateRestoreListener, + final LogContext logContext) { + this("", consumer, stateRestoreListener, logContext); } @Override @@ -116,7 +116,7 @@ public class StoreChangelogReader implements ChangelogReader { endOffsets.putAll(consumer.endOffsets(initializable.keySet())); } catch (final TimeoutException e) { // if timeout exception gets thrown we just give up this time and retry in the next run loop - log.debug("{} Could not fetch end offset for {}; will fall back to partition by partition fetching", logPrefix, initializable); + log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable); return; } @@ -140,7 +140,7 @@ public class StoreChangelogReader implements ChangelogReader { } needsInitializing.remove(topicPartition); } else { - log.info("{} End offset cannot be found form the returned metadata; removing this partition from the current run loop", logPrefix); + log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop"); iter.remove(); } } @@ -152,7 +152,7 @@ public class StoreChangelogReader implements ChangelogReader { } private void startRestoration(final Map<TopicPartition, StateRestorer> initialized) { - log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet()); + log.debug("Start restoring state stores from changelog topics {}", initialized.keySet()); final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment()); assignment.addAll(initialized.keySet()); @@ -186,8 +186,7 @@ public class StoreChangelogReader implements ChangelogReader { } private void logRestoreOffsets(final TopicPartition partition, final long startingOffset, final Long endOffset) { - log.debug("{} Restoring partition {} from offset {} to endOffset {}", - logPrefix, + log.debug("Restoring partition {} from offset {} to endOffset {}", partition, startingOffset, endOffset); @@ -196,7 +195,7 @@ public class StoreChangelogReader implements ChangelogReader { private Collection<TopicPartition> completed() { final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet()); completed.removeAll(needsRestoring.keySet()); - log.debug("{} completed partitions {}", logPrefix, completed); + log.debug("completed partitions {}", completed); return completed; } @@ -204,7 +203,7 @@ public class StoreChangelogReader implements ChangelogReader { try { partitionInfo.putAll(consumer.listTopics()); } catch (final TimeoutException e) { - log.debug("{} Could not fetch topic metadata within the timeout, will retry in the next run loop", logPrefix); + log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop"); } } @@ -244,8 +243,7 @@ public class StoreChangelogReader implements ChangelogReader { pos)); } - log.debug("{} Completed restoring state from changelog {} with {} records ranging from offset {} to {}", - logPrefix, + log.debug("Completed restoring state from changelog {} with {} records ranging from offset {} to {}", topicPartition, restorer.restoredNumRecords(), restorer.startingOffset(), http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index d479a72..34e9e8a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -35,7 +36,6 @@ import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssigno import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.state.HostInfo; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -55,11 +55,11 @@ import static org.apache.kafka.streams.processor.internals.InternalTopicManager. public class StreamPartitionAssignor implements PartitionAssignor, Configurable, ThreadMetadataProvider { - private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class); private Time time = Time.SYSTEM; private final static int UNKNOWN = -1; public final static int NOT_AVAILABLE = -2; + private Logger log; private String logPrefix; private static class AssignedPartition implements Comparable<AssignedPartition> { @@ -200,6 +200,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, public void configure(Map<String, ?> configs) { numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); + //Initializing the logger without threadDataProvider name because provider name is not known/verified at this point + logPrefix = String.format("stream-thread "); + LogContext logContext = new LogContext(logPrefix); + this.log = logContext.logger(getClass()); + Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE); if (o == null) { KafkaException ex = new KafkaException("StreamThread is not specified"); @@ -216,7 +221,10 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, threadDataProvider = (ThreadDataProvider) o; threadDataProvider.setThreadMetadataProvider(this); - logPrefix = String.format("stream-thread [%s]", threadDataProvider.name()); + //Reassigning the logger with threadDataProvider name + logPrefix = String.format("stream-thread [%s] ", threadDataProvider.name()); + logContext = new LogContext(logPrefix); + this.log = logContext.logger(getClass()); String userEndPoint = (String) configs.get(StreamsConfig.APPLICATION_SERVER_CONFIG); if (userEndPoint != null && !userEndPoint.isEmpty()) { @@ -273,7 +281,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, private void updateSubscribedTopics(Set<String> topics) { SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); - log.debug("{} found {} topics possibly matching regex", logPrefix, topics); + log.debug("found {} topics possibly matching regex", topics); // update the topic groups with the returned subscription set for regex pattern subscriptions subscriptionUpdates.updateTopics(topics); threadDataProvider.builder().updateSubscriptions(subscriptionUpdates, threadDataProvider.name()); @@ -322,7 +330,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, clientMetadata.addConsumer(consumerId, info); } - log.debug("{} Constructed client metadata {} from the member subscriptions.", logPrefix, clientsMetadata); + log.debug("Constructed client metadata {} from the member subscriptions.", clientsMetadata); // ---------------- Step Zero ---------------- // @@ -409,7 +417,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions); - log.debug("{} Created repartition topics {} from the parsed topology.", logPrefix, allRepartitionTopicPartitions.values()); + log.debug("Created repartition topics {} from the parsed topology.", allRepartitionTopicPartitions.values()); // ---------------- Step One ---------------- // @@ -431,7 +439,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, Set<TopicPartition> partitions = entry.getValue(); for (TopicPartition partition : partitions) { if (allAssignedPartitions.contains(partition)) { - log.warn("{} Partition {} is assigned to more than one tasks: {}", logPrefix, partition, partitionsForTask); + log.warn("Partition {} is assigned to more than one tasks: {}", partition, partitionsForTask); } } allAssignedPartitions.addAll(partitions); @@ -450,11 +458,11 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, for (PartitionInfo partitionInfo : partitionInfoList) { TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); if (!allAssignedPartitions.contains(partition)) { - log.warn("{} Partition {} is not assigned to any tasks: {}", logPrefix, partition, partitionsForTask); + log.warn("Partition {} is not assigned to any tasks: {}", partition, partitionsForTask); } } } else { - log.warn("{} No partitions found for topic {}", logPrefix, topic); + log.warn("No partitions found for topic {}", topic); } } @@ -477,14 +485,14 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, changelogTopicMetadata.put(topicConfig.name(), topicMetadata); } else { - log.debug("{} No tasks found for topic group {}", logPrefix, topicGroupId); + log.debug("No tasks found for topic group {}", topicGroupId); } } } prepareTopic(changelogTopicMetadata); - log.debug("{} Created state changelog topics {} from the parsed topology.", logPrefix, changelogTopicMetadata.values()); + log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values()); // ---------------- Step Two ---------------- // @@ -494,13 +502,13 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, states.put(entry.getKey(), entry.getValue().state); } - log.debug("{} Assigning tasks {} to clients {} with number of replicas {}", + log.debug("Assigning tasks {} to clients {} with number of replicas {}", logPrefix, partitionsForTask.keySet(), states, numStandbyReplicas); final StickyTaskAssignor<UUID> taskAssignor = new StickyTaskAssignor<>(states, partitionsForTask.keySet()); taskAssignor.assign(numStandbyReplicas); - log.info("{} Assigned tasks to clients as {}.", logPrefix, states); + log.info("Assigned tasks to clients as {}.", states); // ---------------- Step Three ---------------- // @@ -591,7 +599,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, // could be duplicated if one task has more than one assigned partitions if (partitions.size() != info.activeTasks.size()) { throw new TaskAssignmentException( - String.format("%s Number of assigned partitions %d is not equal to the number of active taskIds %d" + + String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d" + ", assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks.size(), info.toString()) ); } @@ -646,7 +654,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, */ @SuppressWarnings("deprecation") private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions) { - log.debug("{} Starting to validate internal topics in partition assignor.", logPrefix); + log.debug("Starting to validate internal topics in partition assignor."); // first construct the topics to make ready Map<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<>(); @@ -660,7 +668,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, continue; } if (numPartitions < 0) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic [%s] number of partitions not defined", logPrefix, topic.name())); + throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); } topicsToMakeReady.put(topic, numPartitions); @@ -680,7 +688,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, } } - log.debug("{} Completed validating internal topics in partition assignor", logPrefix); + log.debug("Completed validating internal topics in partition assignor"); } private boolean allTopicsCreated(final Set<String> topicNamesToMakeReady, final Map<InternalTopicConfig, Integer> topicsToMakeReady) { @@ -785,7 +793,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, final Integer partitions = metadata.partitionCountForTopic(topic); if (partitions == null) { - throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topic not found: %s", logPrefix, topic)); + throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopic not found: %s", logPrefix, topic)); } if (numPartitions == UNKNOWN) { @@ -793,7 +801,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable, } else if (numPartitions != partitions) { final String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]); Arrays.sort(topics); - throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%s Topics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ","))); + throw new org.apache.kafka.streams.errors.TopologyBuilderException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ","))); } } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) { numPartitions = NOT_AVAILABLE; http://git-wip-us.apache.org/repos/asf/kafka/blob/f305dd68/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 288a597..084a991 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; @@ -37,8 +38,6 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.state.internals.ThreadCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; @@ -53,8 +52,6 @@ import static java.util.Collections.singleton; */ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator { - private static final Logger log = LoggerFactory.getLogger(StreamTask.class); - private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null); private final PartitionGroup partitionGroup; @@ -129,7 +126,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator consumedOffsets = new HashMap<>(); this.producer = producer; - recordCollector = createRecordCollector(); + recordCollector = createRecordCollector(logContext); // initialize the topology with its own context processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache); @@ -162,7 +159,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator */ @Override public void resume() { - log.debug("{} Resuming", logPrefix); + log.debug("Resuming"); if (eosEnabled) { producer.beginTransaction(); transactionInFlight = true; @@ -190,12 +187,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final ProcessorNode currNode = recordInfo.node(); final TopicPartition partition = recordInfo.partition(); - log.trace("{} Start processing one record [{}]", logPrefix, record); + log.trace("Start processing one record [{}]", record); updateProcessorContext(record, currNode); currNode.process(record.key(), record.value()); - log.trace("{} Completed processing one record [{}]", logPrefix, record); + log.trace("Completed processing one record [{}]", record); // update the consumed offset map after processing is done consumedOffsets.put(partition, record.offset()); @@ -227,19 +224,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @Override public void punctuate(final ProcessorNode node, final long timestamp, final PunctuationType type, final Punctuator punctuator) { if (processorContext.currentNode() != null) { - throw new IllegalStateException(String.format("%s Current node is not null", logPrefix)); + throw new IllegalStateException(String.format("%sCurrent node is not null", logPrefix)); } updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node); if (log.isTraceEnabled()) { - log.trace("{} Punctuating processor {} with timestamp {} and punctuation type {}", logPrefix, node.name(), timestamp, type); + log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", node.name(), timestamp, type); } try { node.punctuate(timestamp, punctuator); } catch (final KafkaException e) { - throw new StreamsException(String.format("%s Exception caught while punctuating processor '%s'", logPrefix, node.name()), e); + throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); } finally { processorContext.setCurrentNode(null); } @@ -264,7 +261,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator // visible for testing void commit(final boolean startNewTransaction) { - log.debug("{} Committing", logPrefix); + log.debug("Committing"); metrics.metrics.measureLatencyNs( time, new Runnable() { @@ -289,14 +286,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @Override protected void flushState() { - log.trace("{} Flushing state and producer", logPrefix); + log.trace("Flushing state and producer"); super.flushState(); recordCollector.flush(); } private void commitOffsets(final boolean startNewTransaction) { if (commitOffsetNeeded) { - log.trace("{} Committing offsets", logPrefix); + log.trace("Committing offsets"); final Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size()); for (final Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) { final TopicPartition partition = entry.getKey(); @@ -317,7 +314,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator try { consumer.commitSync(consumedOffsetsAndMetadata); } catch (final CommitFailedException e) { - log.warn("{} Failed offset commits {} due to CommitFailedException", logPrefix, consumedOffsetsAndMetadata); + log.warn("Failed offset commits {} due to CommitFailedException", consumedOffsetsAndMetadata); throw e; } } @@ -330,7 +327,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator void initTopology() { // initialize the task by initializing all its processor nodes in the topology - log.trace("{} Initializing processor nodes of the topology", logPrefix); + log.trace("Initializing processor nodes of the topology"); for (final ProcessorNode node : topology.processors()) { processorContext.setCurrentNode(node); try { @@ -352,7 +349,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator */ @Override public void suspend() { - log.debug("{} Suspending", logPrefix); + log.debug("Suspending"); suspend(true); } @@ -374,7 +371,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } private void closeTopology() { - log.trace("{} Closing processor topology", logPrefix); + log.trace("Closing processor topology"); partitionGroup.clear(); @@ -411,7 +408,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator if (firstException == null) { firstException = e; } - log.error("{} Could not close state manager due to the following error:", logPrefix, e); + log.error("Could not close state manager due to the following error:", e); } try { @@ -434,7 +431,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator recordCollector.close(); } } catch (final Throwable e) { - log.error("{} Failed to close producer due to the following error:", logPrefix, e); + log.error("Failed to close producer due to the following error:", e); } } } @@ -467,7 +464,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @Override public void close(boolean clean, final boolean isZombie) { - log.debug("{} Closing", logPrefix); + log.debug("Closing"); RuntimeException firstException = null; try { @@ -475,7 +472,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } catch (final RuntimeException e) { clean = false; firstException = e; - log.error("{} Could not close task due to the following error:", logPrefix, e); + log.error("Could not close task due to the following error:", e); } closeSuspended(clean, isZombie, firstException); @@ -495,7 +492,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final int newQueueSize = partitionGroup.addRawRecords(partition, records); if (log.isTraceEnabled()) { - log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize); + log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize); } // if after adding these records, its partition queue's buffered size has been @@ -516,7 +513,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator */ public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator punctuator) { if (processorContext.currentNode() == null) { - throw new IllegalStateException(String.format("%s Current node is null", logPrefix)); + throw new IllegalStateException(String.format("%sCurrent node is null", logPrefix)); } final PunctuationSchedule schedule = new PunctuationSchedule(processorContext.currentNode(), interval, punctuator); @@ -596,12 +593,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } // visible for testing only - RecordCollector createRecordCollector() { - return new RecordCollectorImpl(producer, id.toString()); + RecordCollector createRecordCollector(final LogContext logContext) { + return new RecordCollectorImpl(producer, id.toString(), logContext); } public boolean initialize() { - log.debug("{} Initializing", logPrefix); + log.debug("Initializing"); initializeStateStores(); initTopology(); processorContext.initialized();