This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a4f7675 KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632) a4f7675 is described below commit a4f7675db1a928e73c7a69eb906dd1e9ecd4a22a Author: Boyang Chen <boy...@confluent.io> AuthorDate: Thu May 2 17:16:17 2019 -0700 KAFKA-8285: enable localized thread IDs in Kafka Streams (#6632) Details in the JIRA: https://issues.apache.org/jira/browse/KAFKA-8285 Basically we want to avoid sharing of atomic updates for thread id with multiple stream instances on one JVM. Reviewers: Raoul de Haard, Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 3 ++- .../org/apache/kafka/streams/processor/internals/StreamThread.java | 7 +++---- .../apache/kafka/streams/processor/internals/StreamThreadTest.java | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 315a6bb..c69d927 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -726,7 +726,8 @@ public class KafkaStreams implements AutoCloseable { streamsMetadataState, cacheSizePerThread, stateDirectory, - delegatingStateRestoreListener); + delegatingStateRestoreListener, + i + 1); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index cb4629f..46612e5d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -69,8 +69,6 @@ import static java.util.Collections.singleton; public class StreamThread extends Thread { - private final static AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); - /** * Stream thread states are the possible states that a stream thread can be in. * A thread must only be in one state at a time @@ -600,8 +598,9 @@ public class StreamThread extends Thread { final StreamsMetadataState streamsMetadataState, final long cacheSizeBytes, final StateDirectory stateDirectory, - final StateRestoreListener userStateRestoreListener) { - final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement(); + final StateRestoreListener userStateRestoreListener, + final int threadIdx) { + final String threadClientId = clientId + "-StreamThread-" + threadIdx; final String logPrefix = String.format("stream-thread [%s] ", threadClientId); final LogContext logContext = new LogContext(logPrefix); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 11485e4..1de39d2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -104,6 +104,7 @@ public class StreamThreadTest { private final String clientId = "clientId"; private final String applicationId = "stream-thread-test"; + private final int threadIdx = 1; private final MockTime mockTime = new MockTime(); private final Metrics metrics = new Metrics(); private final MockClientSupplier clientSupplier = new MockClientSupplier(); @@ -244,7 +245,8 @@ public class StreamThreadTest { streamsMetadataState, 0, stateDirectory, - new MockStateRestoreListener()); + new MockStateRestoreListener(), + threadIdx); } @Test @@ -278,6 +280,7 @@ public class StreamThreadTest { final JmxReporter reporter = new JmxReporter("kafka.streams"); metrics.addReporter(reporter); + assertEquals(clientId + "-StreamThread-1", thread.getName()); assertTrue(reporter.containsMbean(String.format("kafka.streams:type=%s,client-id=%s", defaultGroupName, thread.getName()))); }