[kafka] branch 1.1 updated: MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 52c4dc0 MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681) 52c4dc0 is described below commit 52c4dc0693053a493c43d379223c26320d26d90e Author: Rajini SivaramAuthorDate: Tue Mar 13 06:23:00 2018 + MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681) --- .../test/scala/integration/kafka/api/MetricsTest.scala | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 80cfeca..baadd66 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -43,6 +43,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false") + this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10") + this.producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "100") override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) @@ -90,7 +92,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyClientVersionMetrics(this.producers.head.metrics, "Producer") val server = servers.head -verifyBrokerMessageConversionMetrics(server, recordSize) +verifyBrokerMessageConversionMetrics(server, recordSize, tp) verifyBrokerErrorMetrics(servers.head) verifyBrokerZkMetrics(server, topic) @@ -187,7 +189,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) } - private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = { + private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int, tp: TopicPartition): Unit = { val requestMetricsPrefix = "kafka.network:type=RequestMetrics" val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce") val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce") @@ -195,7 +197,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { tempBytes >= recordSize) verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec") - verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0) + +// Conversion time less than 1 millisecond is reported as zero, so retry with larger batches until time > 0 +var iteration = 0 +TestUtils.retry(5000) { + val conversionTimeMs = yammerMetricValue(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").asInstanceOf[Double] + if (conversionTimeMs <= 0.0) { +iteration += 1 +sendRecords(producers.head, 1000 * iteration, 100, tp) + } + assertTrue(s"Message conversion time not recorded $conversionTimeMs", conversionTimeMs > 0.0) +} verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch") verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0) -- To stop receiving notification emails like this one, please contact j...@apache.org.
[kafka] branch trunk updated: KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0bb8e66 KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617) 0bb8e66 is described below commit 0bb8e66184931e2f7830cb713d9260cc0f3383a9 Author: Siva SanthalingamAuthorDate: Mon Mar 12 23:03:32 2018 -0700 KAFKA-6024; Move arg validation in KafkaConsumer ahead of `acquireAndEnsureOpen` (#4617) --- .../kafka/clients/consumer/KafkaConsumer.java | 25 +++--- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3cd034e..81137f3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -966,13 +966,12 @@ public class KafkaConsumer implements Consumer { */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +if (pattern == null) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); + acquireAndEnsureOpen(); try { -if (pattern == null) -throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); - throwIfNoAssignorsConfigured(); - log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); @@ -1337,11 +1336,11 @@ public class KafkaConsumer implements Consumer { */ @Override public void seek(TopicPartition partition, long offset) { +if (offset < 0) +throw new IllegalArgumentException("seek offset must not be a negative number"); + acquireAndEnsureOpen(); try { -if (offset < 0) -throw new IllegalArgumentException("seek offset must not be a negative number"); - log.debug("Seeking to offset {} for partition {}", offset, partition); this.subscriptions.seek(partition, offset); } finally { @@ -1357,11 +1356,11 @@ public class KafkaConsumer implements Consumer { * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToBeginning(Collection partitions) { +if (partitions == null) +throw new IllegalArgumentException("Partitions collection cannot be null"); + acquireAndEnsureOpen(); try { -if (partitions == null) { -throw new IllegalArgumentException("Partitions collection cannot be null"); -} Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to beginning of partition {}", tp); @@ -1383,11 +1382,11 @@ public class KafkaConsumer implements Consumer { * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToEnd(Collection partitions) { +if (partitions == null) +throw new IllegalArgumentException("Partitions collection cannot be null"); + acquireAndEnsureOpen(); try { -if (partitions == null) { -throw new IllegalArgumentException("Partitions collection cannot be null"); -} Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to end of partition {}", tp); -- To stop receiving notification emails like this one, please contact j...@apache.org.
[kafka] branch 1.0 updated: MINOR: Streams doc example should not close store (#4667)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new 6da326f MINOR: Streams doc example should not close store (#4667) 6da326f is described below commit 6da326f8ddec80f7c5eabb1e24d1621596e66520 Author: Matthias J. SaxAuthorDate: Tue Mar 13 00:42:40 2018 -0700 MINOR: Streams doc example should not close store (#4667) Author: Matthias J. Sax Reviewers: Bill Bejeck , Guozhang Wang --- docs/streams/developer-guide/processor-api.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index 6719ad1..fdf6c86 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -140,8 +140,8 @@ @Override public void close() { - // close the key-value store - kvStore.close(); + // close any resources managed by this processor + // Note: Do not close any StateStores as these are managed by the library } } -- To stop receiving notification emails like this one, please contact mj...@apache.org.
[kafka] branch 1.1 updated: MINOR: Streams doc example should not close store (#4667)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new e35946a MINOR: Streams doc example should not close store (#4667) e35946a is described below commit e35946a92bc6f1535741bfd6421082a370bf7bf5 Author: Matthias J. SaxAuthorDate: Tue Mar 13 00:42:40 2018 -0700 MINOR: Streams doc example should not close store (#4667) Author: Matthias J. Sax Reviewers: Bill Bejeck , Guozhang Wang --- docs/streams/developer-guide/processor-api.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index 6719ad1..fdf6c86 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -140,8 +140,8 @@ @Override public void close() { - // close the key-value store - kvStore.close(); + // close any resources managed by this processor + // Note: Do not close any StateStores as these are managed by the library } } -- To stop receiving notification emails like this one, please contact mj...@apache.org.
[kafka] branch trunk updated: MINOR: Streams doc example should not close store (#4667)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0c00aa0 MINOR: Streams doc example should not close store (#4667) 0c00aa0 is described below commit 0c00aa0983f963dae18232ca539cc0731f228c70 Author: Matthias J. SaxAuthorDate: Tue Mar 13 00:42:40 2018 -0700 MINOR: Streams doc example should not close store (#4667) Author: Matthias J. Sax Reviewers: Bill Bejeck , Guozhang Wang --- docs/streams/developer-guide/processor-api.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index 6719ad1..fdf6c86 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -140,8 +140,8 @@ @Override public void close() { - // close the key-value store - kvStore.close(); + // close any resources managed by this processor + // Note: Do not close any StateStores as these are managed by the library } } -- To stop receiving notification emails like this one, please contact mj...@apache.org.
[kafka] branch trunk updated: MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2bf0689 MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681) 2bf0689 is described below commit 2bf06890b9af62b0aa0ed0debc1b36232afe463a Author: Rajini SivaramAuthorDate: Tue Mar 13 06:23:00 2018 + MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681) --- .../test/scala/integration/kafka/api/MetricsTest.scala | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 80cfeca..baadd66 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -43,6 +43,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false") + this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10") + this.producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "100") override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) @@ -90,7 +92,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyClientVersionMetrics(this.producers.head.metrics, "Producer") val server = servers.head -verifyBrokerMessageConversionMetrics(server, recordSize) +verifyBrokerMessageConversionMetrics(server, recordSize, tp) verifyBrokerErrorMetrics(servers.head) verifyBrokerZkMetrics(server, topic) @@ -187,7 +189,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) } - private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = { + private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int, tp: TopicPartition): Unit = { val requestMetricsPrefix = "kafka.network:type=RequestMetrics" val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce") val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce") @@ -195,7 +197,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { tempBytes >= recordSize) verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec") - verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0) + +// Conversion time less than 1 millisecond is reported as zero, so retry with larger batches until time > 0 +var iteration = 0 +TestUtils.retry(5000) { + val conversionTimeMs = yammerMetricValue(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").asInstanceOf[Double] + if (conversionTimeMs <= 0.0) { +iteration += 1 +sendRecords(producers.head, 1000 * iteration, 100, tp) + } + assertTrue(s"Message conversion time not recorded $conversionTimeMs", conversionTimeMs > 0.0) +} verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch") verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0) -- To stop receiving notification emails like this one, please contact j...@apache.org.
[kafka] branch trunk updated: MINOR: add DEFAULT_PORT for Trogdor Agent and Coordinator (#4674)
This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ec9e811 MINOR: add DEFAULT_PORT for Trogdor Agent and Coordinator (#4674) ec9e811 is described below commit ec9e8110e3967c3ef38ed475a0cb40fc951a2aef Author: Colin Patrick McCabeAuthorDate: Tue Mar 13 06:15:44 2018 -0700 MINOR: add DEFAULT_PORT for Trogdor Agent and Coordinator (#4674) Add a DEFAULT_PORT constant for the Trogdor Agent and Coordinator. Reviewers: Rajini Sivaram --- tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java | 2 ++ tools/src/main/java/org/apache/kafka/trogdor/common/Node.java | 7 +-- .../java/org/apache/kafka/trogdor/coordinator/Coordinator.java | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index 43334a1..b0fea87 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -44,6 +44,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; public final class Agent { private static final Logger log = LoggerFactory.getLogger(Agent.class); +public static final int DEFAULT_PORT = ; + /** * The time at which this server was started. */ diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/Node.java b/tools/src/main/java/org/apache/kafka/trogdor/common/Node.java index 24c8488..b0c63d8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/Node.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/Node.java @@ -17,6 +17,9 @@ package org.apache.kafka.trogdor.common; +import org.apache.kafka.trogdor.agent.Agent; +import org.apache.kafka.trogdor.coordinator.Coordinator; + import java.util.Set; /** @@ -34,11 +37,11 @@ public interface Node { } public static int getTrogdorAgentPort(Node node) { -return getIntConfig(node, Platform.Config.TROGDOR_AGENT_PORT, 0); +return getIntConfig(node, Platform.Config.TROGDOR_AGENT_PORT, Agent.DEFAULT_PORT); } public static int getTrogdorCoordinatorPort(Node node) { -return getIntConfig(node, Platform.Config.TROGDOR_COORDINATOR_PORT, 0); +return getIntConfig(node, Platform.Config.TROGDOR_COORDINATOR_PORT, Coordinator.DEFAULT_PORT); } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index b2dc474..545e3a3 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -45,6 +45,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.store; public final class Coordinator { private static final Logger log = LoggerFactory.getLogger(Coordinator.class); +public static final int DEFAULT_PORT = 8889; + /** * The start time of the Coordinator in milliseconds. */ -- To stop receiving notification emails like this one, please contact rsiva...@apache.org.
[kafka] branch trunk updated: KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 95ad035 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684) 95ad035 is described below commit 95ad03540f0d15ae47fd73bae935ab1cb3e8f4b9 Author: Guozhang WangAuthorDate: Tue Mar 13 08:43:58 2018 -0700 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684) As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology. Reviewers: Matthias J. Sax , Bill Bejeck --- .../streams/processor/internals/AssignedTasks.java | 11 -- .../streams/processor/internals/StreamTask.java| 41 -- .../streams/processor/internals/StreamThread.java | 6 .../streams/processor/internals/TaskManager.java | 12 +-- .../internals/AssignedStreamsTasksTest.java| 3 +- .../processor/internals/StreamTaskTest.java| 7 6 files changed, 41 insertions(+), 39 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 8529c9e..c806bfd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -90,6 +90,7 @@ abstract class AssignedTasks { * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ Set initializeNewTasks() { final Set readyPartitions = new HashSet<>(); @@ -240,18 +241,21 @@ abstract class AssignedTasks { log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); +task.resume(); try { -task.resume(); +transitionToRunning(task, new HashSet()); } catch (final TaskMigratedException e) { +// we need to catch migration exception internally since this function +// is triggered in the rebalance callback log.info("Failed to resume {} {} since it got migrated to another thread already. " + "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); final RuntimeException fatalException = closeZombieTask(task); +running.remove(task.id()); if (fatalException != null) { throw fatalException; } throw e; } -transitionToRunning(task, new HashSet()); log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; } else { @@ -271,6 +275,9 @@ abstract class AssignedTasks { } } +/** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void transitionToRunning(final T task, final Set readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index b8777ad..8d6e56a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @param cache the {@link ThreadCache} created by the thread * @param time the system {@link Time} of the thread * @param producer the instance of {@link Producer} used to produce records - * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public StreamTask(final TaskId id, final Collection partitions, @@ -149,14 +148,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator partitionGroup = new PartitionGroup(partitionQueues);
[kafka] branch 1.1 updated: KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new c7bdec7 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684) c7bdec7 is described below commit c7bdec74bad366f485d055a68e910dd55cc65728 Author: Guozhang WangAuthorDate: Tue Mar 13 08:43:58 2018 -0700 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684) As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology. Reviewers: Matthias J. Sax , Bill Bejeck --- .../streams/processor/internals/AssignedTasks.java | 13 +-- .../streams/processor/internals/StreamTask.java| 41 -- .../streams/processor/internals/StreamThread.java | 6 .../streams/processor/internals/TaskManager.java | 12 +-- .../internals/AssignedStreamsTasksTest.java| 3 +- .../processor/internals/StreamTaskTest.java| 7 6 files changed, 43 insertions(+), 39 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 2cd82f4..029f745 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -90,6 +90,7 @@ abstract class AssignedTasks { * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ Set initializeNewTasks() { final Set readyPartitions = new HashSet<>(); @@ -239,17 +240,22 @@ abstract class AssignedTasks { log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); +task.resume(); try { -task.resume(); +transitionToRunning(task, new HashSet()); } catch (final TaskMigratedException e) { +// we need to catch migration exception internally since this function +// is triggered in the rebalance callback +log.info("Failed to resume {} {} since it got migrated to another thread already. " + +"Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); final RuntimeException fatalException = closeZombieTask(task); +running.remove(task.id()); if (fatalException != null) { throw fatalException; } suspended.remove(taskId); throw e; } -transitionToRunning(task, new HashSet()); log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; } else { @@ -269,6 +275,9 @@ abstract class AssignedTasks { } } +/** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void transitionToRunning(final T task, final Set readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6bca02a..d04be04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator * @param cache the {@link ThreadCache} created by the thread * @param time the system {@link Time} of the thread * @param producer the instance of {@link Producer} used to produce records - * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public StreamTask(final TaskId id, final Collection partitions, @@ -149,14 +148,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
[kafka] branch trunk updated: MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aa14ec0 MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703) aa14ec0 is described below commit aa14ec08b76a1d9e85e39c4aa48e0554b7e17301 Author: hmclAuthorDate: Tue Mar 13 16:41:54 2018 -0700 MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703) --- bin/windows/kafka-consumer-offset-checker.bat | 17 - 1 file changed, 17 deletions(-) diff --git a/bin/windows/kafka-consumer-offset-checker.bat b/bin/windows/kafka-consumer-offset-checker.bat deleted file mode 100644 index 2baa1b8..000 --- a/bin/windows/kafka-consumer-offset-checker.bat +++ /dev/null @@ -1,17 +0,0 @@ -@echo off -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. - -"%~dp0kafka-run-class.bat" kafka.tools.ConsumerOffsetChecker %* -- To stop receiving notification emails like this one, please contact j...@apache.org.
[kafka] branch trunk updated: MINOR: Remove unused server exceptions (#4701)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ad35529 MINOR: Remove unused server exceptions (#4701) ad35529 is described below commit ad355298c65c2c00131480ab2ed59d79539ea80f Author: Manikumar Reddy OAuthorDate: Wed Mar 14 02:29:13 2018 +0530 MINOR: Remove unused server exceptions (#4701) --- .../common/LeaderElectionNotNeededException.scala | 27 .../kafka/common/NoReplicaOnlineException.scala| 29 -- .../kafka/common/NotAssignedReplicaException.scala | 23 - 3 files changed, 79 deletions(-) diff --git a/core/src/main/scala/kafka/common/LeaderElectionNotNeededException.scala b/core/src/main/scala/kafka/common/LeaderElectionNotNeededException.scala deleted file mode 100644 index ca89d25..000 --- a/core/src/main/scala/kafka/common/LeaderElectionNotNeededException.scala +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package kafka.common - -/** - * This exception is thrown when new leader election is not necessary. - */ -class LeaderElectionNotNeededException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this() = this(null, null) -} - - diff --git a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala b/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala deleted file mode 100644 index b66c8fc..000 --- a/core/src/main/scala/kafka/common/NoReplicaOnlineException.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - - -/** - * This exception is thrown by the leader elector in the controller when leader election fails for a partition since - * all the leader candidate replicas for a partition are offline; the set of candidates may or may not be limited - * to just the in sync replicas depending upon whether unclean leader election is allowed to occur. - */ -class NoReplicaOnlineException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this() = this(null, null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala deleted file mode 100644 index 409d112..000 --- a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the
[kafka] branch 1.1 updated: KAFKA-3978; Ensure high watermark is always positive (#4695)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 77137e9 KAFKA-3978; Ensure high watermark is always positive (#4695) 77137e9 is described below commit 77137e993b15ad4272972f05f61e7faee36a1914 Author: Dong LinAuthorDate: Tue Mar 13 22:52:59 2018 -0700 KAFKA-3978; Ensure high watermark is always positive (#4695) Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch. Reviewers: Ismael Juma , Jason Gustafson --- core/src/main/scala/kafka/cluster/Partition.scala | 6 - core/src/main/scala/kafka/cluster/Replica.scala| 14 ++-- core/src/main/scala/kafka/log/Log.scala| 8 +++ .../admin/ReassignPartitionsClusterTest.scala | 26 ++ 4 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b97671..68faf00 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -460,7 +460,11 @@ class Partition(val topic: String, }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark -if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { + +// Ensure that the high watermark increases monotonically. We also update the high watermark when the new +// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. +if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || + (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { leaderReplica.highWatermark = newHighWatermark debug(s"High watermark updated to $newHighWatermark") true diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e41e389..030e5b7 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -138,6 +138,9 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { + if (newHighWatermark.messageOffset < 0) +throw new IllegalArgumentException("High watermark offset should be non-negative") + highWatermarkMetadata = newHighWatermark log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") @@ -165,9 +168,16 @@ class Replica(val brokerId: Int, s"non-local replica $brokerId")) } - def convertHWToLocalOffsetMetadata() = { + /* + * Convert hw to local offset metadata by reading the log at the hw offset. + * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. + */ + def convertHWToLocalOffsetMetadata() { if (isLocal) { - highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) + highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse { +val firstOffset = log.get.logSegments.head.baseOffset +new LogOffsetMetadata(firstOffset, firstOffset, 0) + } } else { throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId") } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 257dd8f..f0050f5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, return unknown offset metadata + * If the message offset is out of range, return None to the caller. */ - def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { + def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { try { val fetchDataInfo = readUncommitted(offset, 1) - fetchDataInfo.fetchOffsetMetadata + Some(fetchDataInfo.fetchOffsetMetadata) } catch { - case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata + case _:
[kafka] branch trunk updated: KAFKA-3978; Ensure high watermark is always positive (#4695)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6b08905 KAFKA-3978; Ensure high watermark is always positive (#4695) 6b08905 is described below commit 6b08905dfb40923317745b5e2237b2310b71 Author: Dong LinAuthorDate: Tue Mar 13 22:52:59 2018 -0700 KAFKA-3978; Ensure high watermark is always positive (#4695) Partition high watermark may become -1 if the initial value is out of range. This situation can occur during partition reassignment, for example. The bug was fixed and validated with unit test in this patch. Reviewers: Ismael Juma , Jason Gustafson --- core/src/main/scala/kafka/cluster/Partition.scala | 6 - core/src/main/scala/kafka/cluster/Replica.scala| 14 ++-- core/src/main/scala/kafka/log/Log.scala| 8 +++ .../admin/ReassignPartitionsClusterTest.scala | 26 ++ 4 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b97671..68faf00 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -460,7 +460,11 @@ class Partition(val topic: String, }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark -if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { + +// Ensure that the high watermark increases monotonically. We also update the high watermark when the new +// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. +if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || + (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { leaderReplica.highWatermark = newHighWatermark debug(s"High watermark updated to $newHighWatermark") true diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e41e389..030e5b7 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -138,6 +138,9 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { + if (newHighWatermark.messageOffset < 0) +throw new IllegalArgumentException("High watermark offset should be non-negative") + highWatermarkMetadata = newHighWatermark log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") @@ -165,9 +168,16 @@ class Replica(val brokerId: Int, s"non-local replica $brokerId")) } - def convertHWToLocalOffsetMetadata() = { + /* + * Convert hw to local offset metadata by reading the log at the hw offset. + * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. + */ + def convertHWToLocalOffsetMetadata() { if (isLocal) { - highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) + highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse { +val firstOffset = log.get.logSegments.head.baseOffset +new LogOffsetMetadata(firstOffset, firstOffset, 0) + } } else { throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId") } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 257dd8f..f0050f5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, return unknown offset metadata + * If the message offset is out of range, return None to the caller. */ - def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { + def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { try { val fetchDataInfo = readUncommitted(offset, 1) - fetchDataInfo.fetchOffsetMetadata + Some(fetchDataInfo.fetchOffsetMetadata) } catch { - case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata + case _:
[kafka] branch 1.1 updated: MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new d3860b5 MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703) d3860b5 is described below commit d3860b535eff16ca3b0ab36964118b36ad0b0c8c Author: hmclAuthorDate: Tue Mar 13 16:41:54 2018 -0700 MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703) --- bin/windows/kafka-consumer-offset-checker.bat | 17 - 1 file changed, 17 deletions(-) diff --git a/bin/windows/kafka-consumer-offset-checker.bat b/bin/windows/kafka-consumer-offset-checker.bat deleted file mode 100644 index 2baa1b8..000 --- a/bin/windows/kafka-consumer-offset-checker.bat +++ /dev/null @@ -1,17 +0,0 @@ -@echo off -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. - -"%~dp0kafka-run-class.bat" kafka.tools.ConsumerOffsetChecker %* -- To stop receiving notification emails like this one, please contact j...@apache.org.
[kafka] branch 1.0 updated: MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703)
This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new cc0881d MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703) cc0881d is described below commit cc0881d7b6e171b8b65d67826eeba39333883de0 Author: hmclAuthorDate: Tue Mar 13 16:41:54 2018 -0700 MINOR: Remove kafka-consumer-offset-checker.bat for KAFKA-3356 (#4703) --- bin/windows/kafka-consumer-offset-checker.bat | 17 - 1 file changed, 17 deletions(-) diff --git a/bin/windows/kafka-consumer-offset-checker.bat b/bin/windows/kafka-consumer-offset-checker.bat deleted file mode 100644 index 2baa1b8..000 --- a/bin/windows/kafka-consumer-offset-checker.bat +++ /dev/null @@ -1,17 +0,0 @@ -@echo off -rem Licensed to the Apache Software Foundation (ASF) under one or more -rem contributor license agreements. See the NOTICE file distributed with -rem this work for additional information regarding copyright ownership. -rem The ASF licenses this file to You under the Apache License, Version 2.0 -rem (the "License"); you may not use this file except in compliance with -rem the License. You may obtain a copy of the License at -rem -rem http://www.apache.org/licenses/LICENSE-2.0 -rem -rem Unless required by applicable law or agreed to in writing, software -rem distributed under the License is distributed on an "AS IS" BASIS, -rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -rem See the License for the specific language governing permissions and -rem limitations under the License. - -"%~dp0kafka-run-class.bat" kafka.tools.ConsumerOffsetChecker %* -- To stop receiving notification emails like this one, please contact j...@apache.org.