[kafka] branch trunk updated: KAFKA-7429: Enable key/truststore update with same filename/password (#5699)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8fb5e63 KAFKA-7429: Enable key/truststore update with same filename/password (#5699) 8fb5e63 is described below commit 8fb5e63aa88019216e95fdbe0b6874c723b64bb4 Author: Rajini Sivaram AuthorDate: Tue Oct 2 20:57:31 2018 +0100 KAFKA-7429: Enable key/truststore update with same filename/password (#5699) --- .../kafka/common/security/ssl/SslFactory.java | 31 ++- .../kafka/common/security/ssl/SslFactoryTest.java | 46 .../src/main/scala/kafka/server/AdminManager.scala | 2 + .../scala/kafka/server/DynamicBrokerConfig.scala | 64 ++ .../server/DynamicBrokerReconfigurationTest.scala | 9 +++ 5 files changed, 141 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index b1f7df8..b9b5203 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; @@ -47,6 +49,7 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.Enumeration; import java.util.List; import java.util.Map; @@ -54,8 +57,9 @@ import java.util.Objects; import java.util.Set; import java.util.HashSet; - public class SslFactory implements Reconfigurable { +private static final Logger log = LoggerFactory.getLogger(SslFactory.class); + private final Mode mode; private final String clientAuthConfigOverride; private final boolean keystoreVerifiableUsingTruststore; @@ -183,6 +187,9 @@ public class SslFactory implements Reconfigurable { !Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) || !Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword); +if (!keystoreChanged) { +keystoreChanged = keystore.modified(); +} if (keystoreChanged) { return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), @@ -197,6 +204,9 @@ public class SslFactory implements Reconfigurable { !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), truststore.path) || !Objects.equals(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), truststore.password); +if (!truststoreChanged) { +truststoreChanged = truststore.modified(); +} if (truststoreChanged) { return createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), @@ -306,6 +316,7 @@ public class SslFactory implements Reconfigurable { private final String path; private final Password password; private final Password keyPassword; +private Long fileLastModifiedMs; SecurityStore(String type, String path, Password password, Password keyPassword) { Objects.requireNonNull(type, "type must not be null"); @@ -327,11 +338,29 @@ public class SslFactory implements Reconfigurable { // If a password is not set access to the truststore is still available, but integrity checking is disabled. char[] passwordChars = password != null ? password.value().toCharArray() : null; ks.load(in, passwordChars); +fileLastModifiedMs = lastModifiedMs(path); + +log.debug("Loaded key store with path {} modification time {}", path, +fileLastModifiedMs == null ? null : new Date(fileLastModifiedMs)); return ks; } catch (GeneralSecurityException | IOException e) { throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e); } } + +private Long lastModifiedMs(String path) { +try { +return Files.getLastModifiedTime(Paths.get(path)).toMillis(); +} catch (IOException e) { +
[kafka] branch trunk updated: MINOR: Increase timeout for starting JMX tool (#5735)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 09f3205 MINOR: Increase timeout for starting JMX tool (#5735) 09f3205 is described below commit 09f3205e44a0b1251e88edb73faa04b0c73521a2 Author: Randall Hauch AuthorDate: Wed Oct 3 10:56:44 2018 -0500 MINOR: Increase timeout for starting JMX tool (#5735) In some tests, the check monitoring the JMX tool log output doesn’t quite wait long enough before failing. Increasing the timeout from 10 to 20 seconds. --- tests/kafkatest/services/monitor/jmx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index 542d3a5..cf8cbc3 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -83,7 +83,7 @@ class JmxMixin(object): self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) node.account.ssh(cmd, allow_fail=False) -wait_until(lambda: self._jmx_has_output(node), timeout_sec=10, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) +wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) self.started[idx-1] = True def _jmx_has_output(self, node):
[kafka] branch trunk updated: KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a5335e7 KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383) a5335e7 is described below commit a5335e7cbd1e53790b4bb5e95b6f9027e17fbf2b Author: Kevin Lu AuthorDate: Wed Oct 3 09:56:22 2018 -0700 KAFKA-6123: Give client MetricsReporter auto-generated client.id (#5383) --- .../org/apache/kafka/clients/admin/KafkaAdminClient.java | 3 ++- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 7 ++- .../org/apache/kafka/clients/producer/KafkaProducer.java | 8 +++- .../apache/kafka/clients/consumer/KafkaConsumerTest.java | 14 ++ .../apache/kafka/clients/producer/KafkaProducerTest.java | 14 ++ .../java/org/apache/kafka/test/MockMetricsReporter.java| 3 +++ .../connect/runtime/distributed/WorkerGroupMember.java | 4 +++- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 3 ++- 8 files changed, 51 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7abe7ef..ceebc58 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -337,7 +337,8 @@ public class KafkaAdminClient extends AdminClient { config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); List reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, -MetricsReporter.class); +MetricsReporter.class, +Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId)); Map metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4cdc4f8..04b8ec2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -677,7 +677,8 @@ public class KafkaConsumer implements Consumer { .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, -MetricsReporter.class); +MetricsReporter.class, +Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -2211,4 +2212,8 @@ public class KafkaConsumer implements Consumer { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); } +// Visible for testing +String getClientId() { +return clientId; +} } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b3f76ee..e249c12 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -352,7 +352,8 @@ public class KafkaProducer implements Producer { .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, -MetricsReporter.class); +MetricsReporter.class, +Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); @@ -1202,6 +1203,11 @@ public class KafkaProducer implements Producer {
[kafka] branch trunk updated: KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5681309 KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637) 5681309 is described below commit 5681309094b114c65018c6951f23eca88c329e03 Author: Suman AuthorDate: Tue Oct 16 08:44:17 2018 +0530 KAFKA-6764: Improve the whitelist command-line option for console-consumer (#5637) --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 8 ++- .../unit/kafka/tools/ConsoleConsumerTest.scala | 63 ++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 365652a..06705d5 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -65,6 +65,7 @@ object ConsoleConsumer extends Logging { def run(conf: ConsumerConfig) { val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer) + val consumerWrapper = if (conf.partitionArg.isDefined) new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs) @@ -194,7 +195,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) -val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") +val whitelistOpt = parser.accepts("whitelist", "Regular expression specifying whitelist of topics to include for consumption.") .withRequiredArg .describedAs("whitelist") .ofType(classOf[String]) @@ -355,7 +356,7 @@ object ConsoleConsumer extends Logging { val groupIdsProvided = Set( Option(options.valueOf(groupIdOpt)), // via --group Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property - Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --cosumer.config + Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config ).flatten if (groupIdsProvided.size > 1) { @@ -376,6 +377,9 @@ object ConsoleConsumer extends Logging { groupIdPassed = false } +if (groupIdPassed && partitionArg.isDefined) + CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified together.") + def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { try parser.parse(args: _*) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 47b7fae..cdc146f 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -432,4 +432,67 @@ class ConsoleConsumerTest { assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey) } + @Test + def shouldParseGroupIdFromBeginningGivenTogether() { +// Start from earliest +var args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--from-beginning") + +var config = new ConsoleConsumer.ConsumerConfig(args) +assertEquals("localhost:9092", config.bootstrapServer) +assertEquals("test", config.topicArg) +assertEquals(-2, config.offsetArg) +assertEquals(true, config.fromBeginning) + +// Start from latest +args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group" +) + +config = new ConsoleConsumer.ConsumerConfig(args) +assertEquals("localhost:9092", config.bootstrapServer) +assertEquals("test", config.topicArg) +assertEquals(-1, config.offsetArg) +assertEquals(false, config.fromBeginning) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldExitOnGroupIdAndPartitionGivenTogether() { +Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) +//Given +val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--partition", "0") + +//When +try { +
[kafka] branch trunk updated: KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d28c534 KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810) d28c534 is described below commit d28c5348197256db09b59d1ebbfe7db9d3934f47 Author: Stanislav Kozlovski AuthorDate: Mon Oct 29 19:51:07 2018 +0200 KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810) This ConsumeBenchWorker now supports using consumer groups. The groups may be either used to store offsets, or as subscriptions. --- TROGDOR.md | 3 +- tests/bin/trogdor-run-consume-bench.sh | 7 +- ...bench_workload.py => consume_bench_workload.py} | 27 +++-- .../services/trogdor/produce_bench_workload.py | 6 +- tests/kafkatest/tests/core/consume_bench_test.py | 132 + tests/kafkatest/tests/core/produce_bench_test.py | 2 + .../kafka/trogdor/workload/ConsumeBenchSpec.java | 122 +-- .../kafka/trogdor/workload/ConsumeBenchWorker.java | 94 +++ .../apache/kafka/trogdor/workload/TopicsSpec.java | 7 +- .../trogdor/workload/ConsumeBenchSpecTest.java | 78 10 files changed, 419 insertions(+), 59 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index 3783d7e..d71455a 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -141,7 +141,8 @@ ProduceBench starts a Kafka producer on a single agent node, producing to severa RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer. ### ConsumeBench -ConsumeBench starts a Kafka consumer on a single agent node. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. +ConsumeBench starts a Kafka consumer on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumer either subscribes to a set of topics (leveraging consumer group functionality) or manually assigns partitions to itself. +The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. Faults diff --git a/tests/bin/trogdor-run-consume-bench.sh b/tests/bin/trogdor-run-consume-bench.sh index 2e0239e..be9a2f1 100755 --- a/tests/bin/trogdor-run-consume-bench.sh +++ b/tests/bin/trogdor-run-consume-bench.sh @@ -26,12 +26,7 @@ cat <http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from ducktape.mark import parametrize +from ducktape.tests.test import Test +from kafkatest.services.kafka import KafkaService +from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec +from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec +from kafkatest.services.trogdor.task_spec import TaskSpec +from kafkatest.services.trogdor.trogdor import TrogdorService +from kafkatest.services.zookeeper import ZookeeperService + + +class ConsumeBenchTest(Test): +def __init__(self, test_context): +""":type test_context: ducktape.tests.test.TestContext""" +super(ConsumeBenchTest, self).__init__(test_context) +self.zk = ZookeeperService(test_context, num_nodes=3) +self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk) +self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka) +self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka) +self.consumer_workload_service_2 = ConsumeBenchWorkloadService(test_context, self.kafka) +self.active_topics = {"consume_bench_topic[0-5]": {"numPartitions": 5, "replicationFactor": 3}} +self.trogdor = TrogdorService(context=self.test_context, + client_services=[self.kafka, self.producer_workload_service, + self.consumer_workload_service, + self.consumer_workload_service_2]) + +def setUp(self): +self.trogdor.start() +self.zk.start() +self.kafka.start() + +def teardown(self): +self.trogdor.stop() +self.k
[kafka] branch trunk updated: MINOR: KStreams SuppressionDurabilityIntegrationTest should set StreamsConfig.STATE_CONFIG_DIR. (#5870)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 872df55 MINOR: KStreams SuppressionDurabilityIntegrationTest should set StreamsConfig.STATE_CONFIG_DIR. (#5870) 872df55 is described below commit 872df553fc03557d87dc43f4762641da9f874c2e Author: Lucas Bradstreet AuthorDate: Mon Nov 5 11:22:56 2018 -0800 MINOR: KStreams SuppressionDurabilityIntegrationTest should set StreamsConfig.STATE_CONFIG_DIR. (#5870) --- .../streams/integration/SuppressionDurabilityIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index c26b52f..6f759bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -141,7 +142,8 @@ public class SuppressionDurabilityIntegrationTest { mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), -mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE) +mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE), +mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); KafkaStreams driver = getStartedStreams(streamsConfig, builder, true);
[kafka] branch trunk updated: KAFKA-7564: Expose single task details in Trogdor (#5852)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ecb71cf KAFKA-7564: Expose single task details in Trogdor (#5852) ecb71cf is described below commit ecb71cf4719e6d22d6738f8df2fd9e16dad33295 Author: Stanislav Kozlovski AuthorDate: Fri Nov 9 18:31:04 2018 + KAFKA-7564: Expose single task details in Trogdor (#5852) This commit adds a new "/coordinator/tasks/{taskId}" endpoint which fetches details for a single task. --- .../kafka/trogdor/coordinator/Coordinator.java | 6 .../trogdor/coordinator/CoordinatorClient.java | 25 ++ .../coordinator/CoordinatorRestResource.java | 14 .../kafka/trogdor/coordinator/TaskManager.java | 31 ++ .../org/apache/kafka/trogdor/rest/TaskRequest.java | 38 ++ .../apache/kafka/trogdor/common/ExpectedTasks.java | 2 +- .../kafka/trogdor/coordinator/CoordinatorTest.java | 37 + 7 files changed, 152 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index c3271c9..cd3da90 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -30,7 +30,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest; import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.StopTaskRequest; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +106,10 @@ public final class Coordinator { return taskManager.tasks(request); } +public TaskState task(TaskRequest request) throws Exception { +return taskManager.task(request); +} + public void beginShutdown(boolean stopAgents) throws Exception { restServer.beginShutdown(); taskManager.beginShutdown(stopAgents); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 780ae73..80937a8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -32,11 +32,14 @@ import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.StopTaskRequest; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksRequest; +import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.NotFoundException; import javax.ws.rs.core.UriBuilder; import static net.sourceforge.argparse4j.impl.Arguments.store; @@ -151,6 +154,13 @@ public class CoordinatorClient { return resp.body(); } +public TaskState task(TaskRequest request) throws Exception { +String uri = UriBuilder.fromPath(url("/coordinator/tasks/{taskId}")).build(request.taskId()).toString(); +HttpResponse resp = JsonRestServer.httpRequest(log, uri, "GET", +null, new TypeReference() { }, maxTries); +return resp.body(); +} + public void shutdown() throws Exception { HttpResponse resp = JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT", @@ -181,6 +191,12 @@ public class CoordinatorClient { .type(Boolean.class) .dest("show_tasks") .help("Show coordinator tasks."); +actions.addArgument("--show-task") +.action(store()) +.type(String.class) +.dest("show_task") +.metavar("TASK_ID") +.help("Show a specific coordinator task."); actions.addArgument("--create-task") .action(store()) .type(String.class) @@ -229,6 +245,15 @@ public class CoordinatorClient { System.out.println("Got coordinator tasks: " + JsonUtil.toPrettyJsonString(client.tasks( new TasksRequest(null, 0, 0, 0, 0; +} else if (res.getString("show_task")
[kafka] branch trunk updated: Trogdor: Fix /coordinator/tasks parameters to accept long values (#5905)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d00938f Trogdor: Fix /coordinator/tasks parameters to accept long values (#5905) d00938f is described below commit d00938fdf862b70923528ceff87ed0fc081b13d1 Author: Stanislav Kozlovski AuthorDate: Tue Nov 13 16:35:03 2018 + Trogdor: Fix /coordinator/tasks parameters to accept long values (#5905) Reviewers: Colin McCabe --- .../apache/kafka/trogdor/coordinator/CoordinatorRestResource.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java index 9163720..c0e7fc9 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java @@ -98,10 +98,10 @@ public class CoordinatorRestResource { @GET @Path("/tasks") public TasksResponse tasks(@QueryParam("taskId") List taskId, -@DefaultValue("0") @QueryParam("firstStartMs") int firstStartMs, -@DefaultValue("0") @QueryParam("lastStartMs") int lastStartMs, -@DefaultValue("0") @QueryParam("firstEndMs") int firstEndMs, -@DefaultValue("0") @QueryParam("lastEndMs") int lastEndMs) throws Throwable { +@DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs, +@DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs, +@DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs, +@DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws Throwable { return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs)); }
[kafka] branch trunk updated: KAFKA-7402: Implement KIP-376 AutoCloseable additions
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9646602 KAFKA-7402: Implement KIP-376 AutoCloseable additions 9646602 is described below commit 9646602d6832ad0a5f2e9b65af5df1a80a571691 Author: Yishun Guan AuthorDate: Fri Nov 16 15:58:47 2018 -0800 KAFKA-7402: Implement KIP-376 AutoCloseable additions --- .../java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java | 2 +- .../apache/kafka/clients/consumer/internals/AbstractCoordinator.java| 2 +- .../src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java | 2 +- clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java | 2 +- clients/src/main/java/org/apache/kafka/common/network/Selector.java | 2 +- .../main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java | 2 +- .../internals/expiring/ExpiringCredentialRefreshingLogin.java | 2 +- .../src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java | 2 +- .../src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java | 2 +- .../main/java/org/apache/kafka/connect/transforms/TimestampRouter.java | 2 +- streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java| 2 +- .../org/apache/kafka/streams/processor/internals/RecordCollector.java | 2 +- tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java | 2 +- 13 files changed, 13 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java index 763fe51..6af4705 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java @@ -40,7 +40,7 @@ import java.util.Map; * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface ConsumerInterceptor extends Configurable { +public interface ConsumerInterceptor extends Configurable, AutoCloseable { /** * This is called just before the records are returned by diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 335e0f2..fb710f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -933,7 +933,7 @@ public abstract class AbstractCoordinator implements Closeable { } } -private class HeartbeatThread extends KafkaThread { +private class HeartbeatThread extends KafkaThread implements AutoCloseable { private boolean enabled = false; private boolean closed = false; private AtomicReference failed = new AtomicReference<>(null); diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 995bdaa..55d6b25 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.Configurable; * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Configurable { +public interface MetricsReporter extends Configurable, AutoCloseable { /** * This is called when the reporter is first registered to initially register all existing metrics diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index 47b1375..3bca276 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -63,7 +63,7 @@ import java.util.function.Supplier; * to memory pressure or other reasons * */ -public class KafkaChannel { +public class KafkaChannel implements AutoCloseable { private static final long MIN_REAUTH_INTERVAL_ONE_SECOND_NANOS = 1000 * 1000 * 1000; /** diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index b960fcb..843d
[kafka] branch trunk updated: Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7fadf0a Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907) 7fadf0a is described below commit 7fadf0a11ddb2451adc66e1b179b84b050ad3f4f Author: Stanislav Kozlovski AuthorDate: Mon Nov 26 16:07:15 2018 -0800 Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907) Reviewers: Colin McCabe --- checkstyle/suppressions.xml| 2 +- .../trogdor/coordinator/CoordinatorClient.java | 4 +- .../coordinator/CoordinatorRestResource.java | 29 +++--- .../kafka/trogdor/coordinator/TaskManager.java | 38 ++--- .../org/apache/kafka/trogdor/rest/TaskState.java | 8 +-- .../apache/kafka/trogdor/rest/TaskStateType.java | 42 +++ .../apache/kafka/trogdor/rest/TasksRequest.java| 23 +++- .../apache/kafka/trogdor/common/ExpectedTasks.java | 3 +- .../kafka/trogdor/coordinator/CoordinatorTest.java | 62 -- 9 files changed, 146 insertions(+), 65 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f3ab7ec..75ad799 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -57,7 +57,7 @@ files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/> + files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/> taskId, +@Path("/tasks/") +public Response tasks(@QueryParam("taskId") List taskId, @DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs, @DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs, @DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs, -@DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws Throwable { -return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs)); +@DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs, +@DefaultValue("") @QueryParam("state") String state) throws Throwable { +boolean isEmptyState = state.equals(""); +if (!isEmptyState && !TaskStateType.Constants.VALUES.contains(state)) { +return Response.status(400).entity( +String.format("State %s is invalid. Must be one of %s", +state, TaskStateType.Constants.VALUES) +).build(); +} + +Optional givenState = Optional.ofNullable(isEmptyState ? null : TaskStateType.valueOf(state)); +TasksResponse resp = coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs, givenState)); + +return Response.status(200).entity(resp).build(); } @GET diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 934acd3..18ff9cb 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -32,11 +32,12 @@ import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskState; +import org.apache.kafka.trogdor.rest.TaskStateType; import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TasksRequest; -import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; @@ -142,13 +143,6 @@ public final class TaskManager { Utils.join(nodeManagers.keySet(), ", ")); } -enum ManagedTaskState { -PENDING, -RUNNING, -STOPPING, -DONE; -} - class ManagedTask { /** * The task id. @@ -168,7 +162,7 @@ public final class TaskManager { /** * The task state. */ -private ManagedTaskState state; +private TaskStateType state; /** * The
[kafka] branch trunk updated: KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9368743 KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885) 9368743 is described below commit 9368743b8fd2b42a41b44860ea0f3588bb273cc8 Author: Stanislav Kozlovski AuthorDate: Tue Nov 27 20:49:53 2018 + KAFKA-7597: Add transaction support to ProduceBenchWorker (#5885) KAFKA-7597: Add configurable transaction support to ProduceBenchWorker. In order to get support for serializing Optional<> types to JSON, add a new library: jackson-datatype-jdk8. Once Jackson 3 comes out, this library will not be needed. Reviewers: Colin McCabe , Ismael Juma --- build.gradle | 5 + gradle/dependencies.gradle | 1 + .../bin/trogdor-run-transactional-produce-bench.sh | 51 ++ .../services/trogdor/produce_bench_workload.py | 4 +- tests/kafkatest/tests/core/produce_bench_test.py | 29 +- .../org/apache/kafka/trogdor/common/JsonUtil.java | 2 + .../kafka/trogdor/workload/ProduceBenchSpec.java | 37 +++ .../kafka/trogdor/workload/ProduceBenchWorker.java | 112 + .../trogdor/workload/TransactionGenerator.java | 43 .../workload/UniformTransactionsGenerator.java | 57 +++ .../trogdor/common/JsonSerializationTest.java | 3 +- 11 files changed, 318 insertions(+), 26 deletions(-) diff --git a/build.gradle b/build.gradle index 4d514df..5ce648a 100644 --- a/build.gradle +++ b/build.gradle @@ -565,6 +565,7 @@ project(':core') { dependencies { compile project(':clients') compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes compile libs.joptSimple compile libs.metrics compile libs.scalaLibrary @@ -830,6 +831,7 @@ project(':clients') { compile libs.snappy compile libs.slf4jApi compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing +compileOnly libs.jacksonJDK8Datatypes jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope dependency. @@ -839,6 +841,7 @@ project(':clients') { testRuntime libs.slf4jlog4j testRuntime libs.jacksonDatabind +testRuntime libs.jacksonJDK8Datatypes } task determineCommitId { @@ -918,6 +921,7 @@ project(':tools') { compile project(':log4j-appender') compile libs.argparse4j compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes compile libs.slf4jApi compile libs.jacksonJaxrsJsonProvider @@ -1347,6 +1351,7 @@ project(':connect:json') { dependencies { compile project(':connect:api') compile libs.jacksonDatabind +compile libs.jacksonJDK8Datatypes compile libs.slf4jApi testCompile libs.easymock diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7dd3604..59f56fc 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -103,6 +103,7 @@ libs += [ bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix", easymock: "org.easymock:easymock:$versions.easymock", jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:$versions.jackson", + jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson", jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb", jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs", diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh b/tests/bin/trogdor-run-transactional-produce-bench.sh new file mode 100755 index 000..fd5ff0a --- /dev/null +++ b/tests/bin/trogdor-run-transactional-produce-bench.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +COORDINATOR_ENDPOINT="localhost:
[kafka] branch trunk updated: MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ec66818 MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957) ec66818 is described below commit ec668180d797c4d08ea899de61c344438621ced7 Author: Stanislav Kozlovski AuthorDate: Wed Nov 28 17:13:21 2018 + MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957) Reivewers: Colin McCabe --- .../java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java| 6 +++--- .../java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java | 2 +- .../java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java| 6 +++--- .../java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java index 0e239b0..b7e0172 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java @@ -91,7 +91,7 @@ public class ConsumeBenchSpec extends TaskSpec { private final String consumerNode; private final String bootstrapServers; private final int targetMessagesPerSec; -private final int maxMessages; +private final long maxMessages; private final Map consumerConf; private final Map adminClientConf; private final Map commonClientConf; @@ -105,7 +105,7 @@ public class ConsumeBenchSpec extends TaskSpec { @JsonProperty("consumerNode") String consumerNode, @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, -@JsonProperty("maxMessages") int maxMessages, +@JsonProperty("maxMessages") long maxMessages, @JsonProperty("consumerGroup") String consumerGroup, @JsonProperty("consumerConf") Map consumerConf, @JsonProperty("commonClientConf") Map commonClientConf, @@ -146,7 +146,7 @@ public class ConsumeBenchSpec extends TaskSpec { } @JsonProperty -public int maxMessages() { +public long maxMessages() { return maxMessages; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index a44d521..1e80209 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -233,7 +233,7 @@ public class ConsumeBenchWorker implements TaskWorker { long bytesConsumed = 0; long startTimeMs = Time.SYSTEM.milliseconds(); long startBatchMs = startTimeMs; -int maxMessages = spec.maxMessages(); +long maxMessages = spec.maxMessages(); try { while (messagesConsumed < maxMessages) { ConsumerRecords records = consumer.poll(); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index c0bbd7e..d15172f 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -64,7 +64,7 @@ public class ProduceBenchSpec extends TaskSpec { private final String producerNode; private final String bootstrapServers; private final int targetMessagesPerSec; -private final int maxMessages; +private final long maxMessages; private final PayloadGenerator keyGenerator; private final PayloadGenerator valueGenerator; private final Optional transactionGenerator; @@ -80,7 +80,7 @@ public class ProduceBenchSpec extends TaskSpec { @JsonProperty("producerNode") String producerNode, @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, - @JsonProperty("maxMessages") int maxMessages, + @JsonProperty("maxMessages") long maxMessages, @JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
svn commit: r1848068 - in /kafka/site: committers.html images/cmccabe.jpg
Author: cmccabe Date: Mon Dec 3 16:51:26 2018 New Revision: 1848068 URL: http://svn.apache.org/viewvc?rev=1848068&view=rev Log: Add cmccabe as committer Added: kafka/site/images/cmccabe.jpg (with props) Modified: kafka/site/committers.html Modified: kafka/site/committers.html URL: http://svn.apache.org/viewvc/kafka/site/committers.html?rev=1848068&r1=1848067&r2=1848068&view=diff == --- kafka/site/committers.html (original) +++ kafka/site/committers.html Mon Dec 3 16:51:26 2018 @@ -111,9 +111,15 @@ Committer https://www.linkedin.com/in/gwenshapira";>/in/gwenshapira https://twitter.com/gwenshap";>@gwenshap + + + + + Colin P. McCabe + Committer + http://www.linkedin.com/in/mccabecolin";>/in/mccabecolin + http://www.club.cc.cmu.edu/~cmccabe/";>cmccabe - - Added: kafka/site/images/cmccabe.jpg URL: http://svn.apache.org/viewvc/kafka/site/images/cmccabe.jpg?rev=1848068&view=auto == Binary file - no diff available. Propchange: kafka/site/images/cmccabe.jpg -- svn:mime-type = image/jpeg
[kafka-site] branch asf-site updated: Add cmccabe to committers (#173)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 9083ba9 Add cmccabe to committers (#173) 9083ba9 is described below commit 9083ba9c8d30d02bf3ceec4b1a7e39a4c1203d55 Author: Colin Patrick McCabe AuthorDate: Mon Dec 3 09:48:06 2018 -0800 Add cmccabe to committers (#173) --- committers.html| 9 + images/cmccabe.jpg | Bin 0 -> 67284 bytes 2 files changed, 9 insertions(+) diff --git a/committers.html b/committers.html index ea5fb88..26ac1e5 100644 --- a/committers.html +++ b/committers.html @@ -221,6 +221,15 @@ + + + + Colin P. McCabe + Committer + http://www.linkedin.com/in/mccabecolin";>/in/mccabecolin + http://www.club.cc.cmu.edu/~cmccabe/";>cmccabe + + diff --git a/images/cmccabe.jpg b/images/cmccabe.jpg new file mode 100644 index 000..38e600f Binary files /dev/null and b/images/cmccabe.jpg differ
[kafka] branch trunk updated: KAFKA-7051: Improve the efficiency of ReplicaManager (#5206)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 145cad7 KAFKA-7051: Improve the efficiency of ReplicaManager (#5206) 145cad7 is described below commit 145cad752d720c6ea735a5e2f977d10ace5bcc0f Author: Colin Patrick McCabe AuthorDate: Mon Jan 7 13:20:07 2019 -0800 KAFKA-7051: Improve the efficiency of ReplicaManager (#5206) Reviewers: Jun Rao , Ismael Juma , Dong Lin --- .../main/scala/kafka/server/ReplicaManager.scala | 24 ++ 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 4cc3feb..33c844f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -830,10 +830,15 @@ class ReplicaManager(val config: KafkaConfig, val logReadResults = readFromLog() // check if this fetch request can be satisfied right away -val logReadResultValues = logReadResults.map { case (_, v) => v } -val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum -val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) => - errorIncurred || (readResult.error != Errors.NONE)) +var bytesReadable: Long = 0 +var errorReadingData = false +val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] +logReadResults.foreach { case (topicPartition, logReadResult) => + if (logReadResult.error != Errors.NONE) +errorReadingData = true + bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes + logReadResultMap.put(topicPartition, logReadResult) +} // respond immediately if 1) fetch request does not want to wait //2) fetch request does not require any data @@ -847,11 +852,12 @@ class ReplicaManager(val config: KafkaConfig, responseCallback(fetchPartitionData) } else { // construct the fetch results from the read results - val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => -val fetchInfo = fetchInfos.collectFirst { - case (tp, v) if tp == topicPartition => v -}.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos")) -(topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) + val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] + fetchInfos.foreach { case (topicPartition, partitionData) => +logReadResultMap.get(topicPartition).map(logReadResult => { + val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata + fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) +}) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
[kafka] branch trunk updated: KAFKA-7051: Improve the efficiency of ReplicaManager (fixup)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aff0294 KAFKA-7051: Improve the efficiency of ReplicaManager (fixup) aff0294 is described below commit aff02944fee9dd79e58a6dd31291d5058bc786bb Author: Colin P. Mccabe AuthorDate: Mon Jan 7 13:21:14 2019 -0800 KAFKA-7051: Improve the efficiency of ReplicaManager (fixup) --- core/src/main/scala/kafka/server/ReplicaManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 33c844f..5dfb2e6 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -854,7 +854,7 @@ class ReplicaManager(val config: KafkaConfig, // construct the fetch results from the read results val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] fetchInfos.foreach { case (topicPartition, partitionData) => -logReadResultMap.get(topicPartition).map(logReadResult => { +logReadResultMap.get(topicPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) })
[kafka] branch trunk updated: KAFKA-7790: Fix Bugs in Trogdor Task Expiration (#6103)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 625e0d8 KAFKA-7790: Fix Bugs in Trogdor Task Expiration (#6103) 625e0d8 is described below commit 625e0d882944f74568464037c3a8aafb585079f2 Author: Stanislav Kozlovski AuthorDate: Fri Jan 11 23:38:00 2019 +0200 KAFKA-7790: Fix Bugs in Trogdor Task Expiration (#6103) The Trogdor Coordinator now overwrites a task's startMs to the time it received it if startMs is in the past. The Trogdor Agent now correctly expires a task after the expiry time (startMs + durationMs) passes. Previously, it would ignore startMs and expire after durationMs milliseconds of local start of the task. Reviewed-by: Colin P. McCabe --- .../apache/kafka/trogdor/agent/WorkerManager.java | 9 +- .../kafka/trogdor/coordinator/NodeManager.java | 84 -- .../kafka/trogdor/coordinator/TaskManager.java | 34 -- .../org/apache/kafka/trogdor/task/TaskSpec.java| 7 ++ .../org/apache/kafka/trogdor/agent/AgentTest.java | 71 +--- .../kafka/trogdor/common/MiniTrogdorCluster.java | 23 +++- .../kafka/trogdor/coordinator/CoordinatorTest.java | 125 +++-- 7 files changed, 282 insertions(+), 71 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index ef02716..bf3d293 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -280,7 +280,8 @@ public final class WorkerManager { void transitionToRunning() { state = State.RUNNING; timeoutFuture = scheduler.schedule(stateChangeExecutor, -new StopWorker(workerId, false), spec.durationMs()); +new StopWorker(workerId, false), +Math.max(0, spec.endMs() - time.milliseconds())); } void transitionToStopping() { @@ -316,6 +317,12 @@ public final class WorkerManager { "a worker with that id.", nodeName, workerId); return; } +if (worker.spec.endMs() <= time.milliseconds()) { +log.info("{}: Will not run worker {} as it has expired.", nodeName, worker); +stateChangeExecutor.submit(new HandleWorkerHalting(worker, +"worker expired", true)); +return; +} KafkaFutureImpl haltFuture = new KafkaFutureImpl<>(); haltFuture.thenApply((KafkaFuture.BaseFunction) errorString -> { if (errorString == null) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 3f0075e..97ad4ae 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -49,10 +49,12 @@ import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; import org.apache.kafka.trogdor.rest.WorkerState; +import org.apache.kafka.trogdor.rest.WorkerStopping; import org.apache.kafka.trogdor.task.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,46 +204,58 @@ public final class NodeManager { if (log.isTraceEnabled()) { log.trace("{}: got heartbeat status {}", node.name(), agentStatus); } -// Identify workers which we think should be running, but which do not appear -// in the agent's response. We need to send startWorker requests for these. -for (Map.Entry entry : workers.entrySet()) { -Long workerId = entry.getKey(); -if (!agentStatus.workers().containsKey(workerId)) { -ManagedWorker worker = entry.getValue(); -if (worker.shouldRun) { -worker.tryCreate(); -} +handleMissingWorkers(agentStatus); +handlePresentWorkers(agentStatus); +} catch (Throwable e) { +log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); +
[kafka] branch trunk updated: KAFKA-7808: AdminClient#describeTopics should not throw InvalidTopic if topic name is not found (#6124)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7df3e8c KAFKA-7808: AdminClient#describeTopics should not throw InvalidTopic if topic name is not found (#6124) 7df3e8c is described below commit 7df3e8cd38083ae78ec1e26a259cfb3bb1fbe2d2 Author: Lee Dongjin AuthorDate: Sat Jan 12 07:56:06 2019 +0900 KAFKA-7808: AdminClient#describeTopics should not throw InvalidTopic if topic name is not found (#6124) * Update KafkaAdminClient#describeTopics to throw UnknownTopicOrPartitionException. * Remove unused method: WorkerUtils#getMatchingTopicPartitions. * Add some JavaDoc. Reviewed-by: Colin P. McCabe , Ryanne Dolan --- .../kafka/clients/admin/KafkaAdminClient.java | 3 +- .../kafka/clients/admin/MockAdminClient.java | 3 +- .../apache/kafka/trogdor/common/WorkerUtils.java | 32 -- 3 files changed, 9 insertions(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 39817df..ce51eee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -56,6 +56,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.JmxReporter; @@ -1461,7 +1462,7 @@ public class KafkaAdminClient extends AdminClient { continue; } if (!cluster.topics().contains(topicName)) { -future.completeExceptionally(new InvalidTopicException("Topic " + topicName + " not found.")); +future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + topicName + " not found.")); continue; } boolean isInternal = cluster.internalTopics().contains(topicName); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 9fe1ba4..ed7efd5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -235,8 +235,7 @@ public class MockAdminClient extends AdminClient { } if (!topicDescriptions.containsKey(requestedTopic)) { KafkaFutureImpl future = new KafkaFutureImpl<>(); -future.completeExceptionally(new UnknownTopicOrPartitionException( -String.format("Topic %s unknown.", requestedTopic))); +future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found.")); topicDescriptions.put(requestedTopic, future); } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index 3d4871a..ef6e275 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -135,34 +136,11 @@ public final class WorkerUtils { } /** - * Returns a list of all existing topic partitions that match the following criteria: topic - * name matches give regular expression 'topicRegex', topic is not internal, partitions are - * in range [startPartition, endPartition] - * - * @param logThe logger to use. - * @param bootstrapServers The bootstrap server list. - * @param topicRegex Topic name regular expression - * @param startPartition Starting partition of
[kafka] branch trunk updated: MINOR: Update Trogdor StringExpander regex to handle an epilogue (#6123)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 13f6790 MINOR: Update Trogdor StringExpander regex to handle an epilogue (#6123) 13f6790 is described below commit 13f679013aaa93198d30df8dada4aba5614790ea Author: Stanislav Kozlovski AuthorDate: Tue Jan 15 06:49:24 2019 +0200 MINOR: Update Trogdor StringExpander regex to handle an epilogue (#6123) Update the Trogdor StringExpander regex to handle an epilogue. Previously the regex would use a lazy quantifier at the end, which meant it would not catch anything after the range expression. Add a unit test. Reviewed-by: Colin P. McCabe --- .../org/apache/kafka/trogdor/common/StringExpander.java | 2 +- .../apache/kafka/trogdor/common/StringExpanderTest.java | 15 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java index 82f5003..3082a17 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java @@ -29,7 +29,7 @@ import java.util.regex.Pattern; */ public class StringExpander { private final static Pattern NUMERIC_RANGE_PATTERN = -Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)"); +Pattern.compile("(.*)\\[([0-9]*)\\-([0-9]*)\\](.*)"); public static HashSet expand(String val) { HashSet set = new HashSet<>(); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java index 72e1c20..6fbb347 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java @@ -58,5 +58,20 @@ public class StringExpanderTest { "[[ wow52 ]]" )); assertEquals(expected3, StringExpander.expand("[[ wow[50-52] ]]")); + +HashSet expected4 = new HashSet<>(Arrays.asList( +"foo1bar", +"foo2bar", +"foo3bar" +)); +assertEquals(expected4, StringExpander.expand("foo[1-3]bar")); + +// should expand latest range first +HashSet expected5 = new HashSet<>(Arrays.asList( +"start[1-3]middle1epilogue", +"start[1-3]middle2epilogue", +"start[1-3]middle3epilogue" +)); +assertEquals(expected5, StringExpander.expand("start[1-3]middle[1-3]epilogue")); } }
[kafka] branch trunk updated: KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check endpoints (#6130)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2e53fa0 KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check endpoints (#6130) 2e53fa0 is described below commit 2e53fa08af6aba8200f4c7846a8e8e568b56560d Author: Stanislav Kozlovski AuthorDate: Tue Jan 15 21:52:48 2019 +0200 KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check endpoints (#6130) Reviewed-by: Colin P. McCabe --- .../java/org/apache/kafka/trogdor/agent/Agent.java | 11 - .../apache/kafka/trogdor/agent/AgentClient.java| 16 .../kafka/trogdor/agent/AgentRestResource.java | 7 .../kafka/trogdor/coordinator/Coordinator.java | 13 +- .../trogdor/coordinator/CoordinatorClient.java | 16 .../coordinator/CoordinatorRestResource.java | 7 .../apache/kafka/trogdor/rest/UptimeResponse.java | 47 ++ .../org/apache/kafka/trogdor/agent/AgentTest.java | 23 ++- .../kafka/trogdor/coordinator/CoordinatorTest.java | 25 ++-- 9 files changed, 156 insertions(+), 9 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index c76ef26..699e14b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Scheduler; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.rest.AgentStatusResponse; @@ -30,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.DestroyWorkerRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +62,8 @@ public final class Agent { */ private final JsonRestServer restServer; +private final Time time; + /** * Create a new Agent. * @@ -70,7 +74,8 @@ public final class Agent { */ public Agent(Platform platform, Scheduler scheduler, JsonRestServer restServer, AgentRestResource resource) { -this.serverStartMs = scheduler.time().milliseconds(); +this.time = scheduler.time(); +this.serverStartMs = time.milliseconds(); this.workerManager = new WorkerManager(platform, scheduler); this.restServer = restServer; resource.setAgent(this); @@ -94,6 +99,10 @@ public final class Agent { return new AgentStatusResponse(serverStartMs, workerManager.workerStates()); } +public UptimeResponse uptime() { +return new UptimeResponse(serverStartMs, time.milliseconds()); +} + public void createWorker(CreateWorkerRequest req) throws Throwable { workerManager.createWorker(req.workerId(), req.taskId(), req.spec()); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java index c89011b..55c3e7b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java @@ -32,6 +32,7 @@ import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,6 +118,13 @@ public class AgentClient { return resp.body(); } +public UptimeResponse uptime() throws Exception { +HttpResponse resp = +JsonRestServer.httpRequest(url("/agent/uptime"), "GET", +null, new TypeReference() { }, maxTries); +return resp.body(); +} + public void createWorker(CreateWorkerRequest request) throws Exception { HttpResponse resp = JsonRestServer.httpRequest( @@ -168,6 +176,11 @@ public class AgentClient { .type(Boolean.class) .dest("status") .help("Get agent status."); +actions.addArgument("--uptime") +.action(storeTrue()) +.type(Boolean.class) +.dest("uptime") +
[kafka] branch trunk updated: MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f8e8d62 MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test f8e8d62 is described below commit f8e8d62f56c902f2445ab0af910decc3d5f01114 Author: Kan Li AuthorDate: Wed Jan 23 13:39:47 2019 -0800 MINOR: ducker-ak: add down -f, avoid using a terminal in ducker test When using ./ducker-ak test on Jenkins, the script complains that there is no TTY. To fix this, we should skip passing -t to docker exec. We do not need a pseudo-TTY to run the tests. Similarly, we should skip passing -i, since we do not need to keep stdin open. The down command should have a force option, specified as -f or --force. Reviewed-by: Colin P. McCabe --- tests/docker/ducker-ak | 13 - 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index a54bd30..04581fb 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -83,9 +83,10 @@ ssh [node-name|user-name@node-name] [command] is specified, we will run that command. Otherwise, we will provide a login shell. -down [-q|--quiet] +down [-q|--quiet] [-f|--force] Tear down all the currently active ducker-ak nodes. If --quiet is specified, -only error messages are printed. +only error messages are printed. If --force or -f is specified, "docker rm -f" +will be used to remove the nodes, which kills currently running ducker-ak test. purge [--f|--force] Purge Docker images created by ducker-ak. This will free disk space. @@ -421,8 +422,8 @@ ducker_test() { (test -f ./gradlew || gradle) && ./gradlew systemTestLibs must_popd cmd="cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json $args" -echo "docker exec -it ducker01 bash -c \"${cmd}\"" -exec docker exec --user=ducker -it ducker01 bash -c "${cmd}" +echo "docker exec ducker01 bash -c \"${cmd}\"" +exec docker exec --user=ducker ducker01 bash -c "${cmd}" } ducker_ssh() { @@ -476,9 +477,11 @@ echo_running_container_names() { ducker_down() { require_commands docker local verbose=1 +local force_str="" while [[ $# -ge 1 ]]; do case "${1}" in -q|--quiet) verbose=0; shift;; +-f|--force) force_str="-f"; shift;; *) die "ducker_down: unexpected command-line argument ${1}";; esac done @@ -499,7 +502,7 @@ ducker_down() { if [[ -n "${running_containers}" ]]; then must_do ${verbose_flag} docker kill "${running_containers}" fi -must_do ${verbose_flag} docker rm "${all_containers}" +must_do ${verbose_flag} docker rm ${force_str} "${all_containers}" must_do ${verbose_flag} -o rm -f -- "${ducker_dir}/build/node_hosts" "${ducker_dir}/build/cluster.json" if docker network inspect ducknet &>/dev/null; then must_do -v docker network rm ducknet
[kafka] branch trunk updated: KAFKA-7793: Improve the Trogdor command line. (#6133)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a79d6dc KAFKA-7793: Improve the Trogdor command line. (#6133) a79d6dc is described below commit a79d6dcdb687b1df911af024946880a5d9d5490a Author: Colin Patrick McCabe AuthorDate: Thu Jan 24 09:26:51 2019 -0800 KAFKA-7793: Improve the Trogdor command line. (#6133) * Allow the Trogdor agent to be started in "exec mode", where it simply runs a single task and exits after it is complete. * For AgentClient and CoordinatorClient, allow the user to pass the path to a file containing JSON, instead of specifying the JSON object in the command-line text itself. This means that we can get rid of the bash scripts whose only function was to load task specs into a bash string and run a Trogdor command. * Print dates and times in a human-readable way, rather than as numbers of milliseconds. * When listing tasks or workers, output human-readable tables of information. * Allow the user to filter on task ID name, task ID pattern, or task state. * Support a --json flag to provide raw JSON output if desired. Reviewed-by: David Arthur , Stanislav Kozlovski --- TROGDOR.md | 87 ++--- checkstyle/suppressions.xml| 3 + tests/.gitignore | 5 - tests/bin/trogdor-run-consume-bench.sh | 39 --- tests/bin/trogdor-run-produce-bench.sh | 47 --- tests/bin/trogdor-run-round-trip.sh| 42 --- .../bin/trogdor-run-transactional-produce-bench.sh | 51 --- tests/spec/round_trip.json | 34 ++ tests/spec/simple_consume_bench_spec.json | 31 ++ tests/spec/simple_produce_bench.json | 40 +++ tests/spec/transactional-produce-bench.json| 44 +++ .../java/org/apache/kafka/trogdor/agent/Agent.java | 105 ++ .../apache/kafka/trogdor/agent/AgentClient.java| 224 .../apache/kafka/trogdor/agent/WorkerManager.java | 18 +- .../org/apache/kafka/trogdor/common/JsonUtil.java | 41 +++ .../kafka/trogdor/common/StringFormatter.java | 114 ++ .../trogdor/coordinator/CoordinatorClient.java | 389 - .../org/apache/kafka/trogdor/rest/TaskDone.java| 5 + .../org/apache/kafka/trogdor/rest/TaskPending.java | 5 + .../org/apache/kafka/trogdor/rest/TaskRunning.java | 5 + .../org/apache/kafka/trogdor/rest/TaskState.java | 2 + .../apache/kafka/trogdor/rest/TaskStopping.java| 5 + .../org/apache/kafka/trogdor/agent/AgentTest.java | 45 +++ .../apache/kafka/trogdor/common/JsonUtilTest.java | 79 + .../kafka/trogdor/common/StringFormatterTest.java | 68 .../trogdor/coordinator/CoordinatorClientTest.java | 83 + 26 files changed, 1204 insertions(+), 407 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index 168acfb..b551773 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -35,61 +35,26 @@ Let's confirm that all of the daemons are running: 115420 Kafka 115694 Agent -Now, we can submit a test job to Trogdor. Here's an example of a short bash script which makes it easier. - -> ./tests/bin/trogdor-run-produce-bench.sh -Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = produce_bench_21634 - -To get the test results, we run --show-tasks: - -./bin/trogdor.sh client --show-tasks localhost:8889 -Got coordinator tasks: { - "tasks" : { -"produce_bench_21634" : { - "state" : "DONE", - "spec" : { -"class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec", -"startMs" : 0, -"durationMs" : 1000, -"producerNode" : "node0", -"bootstrapServers" : "localhost:9092", -"targetMessagesPerSec" : 1, -"maxMessages" : 5, -"keyGenerator" : { - "type" : "sequential", - "size" : 4, - "startOffset" : 0 -}, -"valueGenerator" : { - "type" : "constant", - "size" : 512, - "value" : "
[kafka] branch trunk updated: MINOR: update copyright year in the NOTICE file. (#6196)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ab4deb1 MINOR: update copyright year in the NOTICE file. (#6196) ab4deb1 is described below commit ab4deb1bcbd41fac5e2ec14a78d5b3ca35955002 Author: Colin Patrick McCabe AuthorDate: Thu Jan 24 14:30:44 2019 -0800 MINOR: update copyright year in the NOTICE file. (#6196) --- NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index c97f407..8a9b248 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache Kafka -Copyright 2018 The Apache Software Foundation. +Copyright 2019 The Apache Software Foundation. This product includes software developed at The Apache Software Foundation (http://www.apache.org/).
[kafka] branch 2.1 updated: MINOR: update copyright year in the NOTICE file. (#6196)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 33cda67 MINOR: update copyright year in the NOTICE file. (#6196) 33cda67 is described below commit 33cda6798e95f8ae59bf67e08f161af67f59e867 Author: Colin Patrick McCabe AuthorDate: Thu Jan 24 14:30:44 2019 -0800 MINOR: update copyright year in the NOTICE file. (#6196) (cherry picked from commit ab4deb1bcbd41fac5e2ec14a78d5b3ca35955002) --- NOTICE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOTICE b/NOTICE index c97f407..8a9b248 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache Kafka -Copyright 2018 The Apache Software Foundation. +Copyright 2019 The Apache Software Foundation. This product includes software developed at The Apache Software Foundation (http://www.apache.org/).
[kafka] branch 2.1 updated: MINOR: bump Jackson version to 2.9.8 (#6199)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new c1f9f76 MINOR: bump Jackson version to 2.9.8 (#6199) c1f9f76 is described below commit c1f9f76b3c3482eb13e1a0baac11d692cbd82d96 Author: Colin Patrick McCabe AuthorDate: Fri Jan 25 09:15:07 2019 -0800 MINOR: bump Jackson version to 2.9.8 (#6199) --- gradle/dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 7dd3604..7c8c5b9 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -53,7 +53,7 @@ versions += [ argparse4j: "0.7.0", bcpkix: "1.60", easymock: "4.0.1", - jackson: "2.9.7", + jackson: "2.9.8", jetty: "9.4.12.v20180830", jersey: "2.27", jmh: "1.21",
[kafka] branch trunk updated: MINOR: Upgrade ducktape to 0.7.5 (#6197)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0dbb064 MINOR: Upgrade ducktape to 0.7.5 (#6197) 0dbb064 is described below commit 0dbb064963cff16195b21b85ff117d9a91e23c34 Author: Konstantine Karantasis AuthorDate: Fri Jan 25 11:14:19 2019 -0800 MINOR: Upgrade ducktape to 0.7.5 (#6197) Reviewed-by: Colin P. McCabe --- tests/docker/Dockerfile | 2 +- tests/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 68efaee..eee4879 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -34,7 +34,7 @@ LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev && apt-get -y clean RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.1 +RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.5 # Set up ssh COPY ./ssh-config /root/.ssh/config diff --git a/tests/setup.py b/tests/setup.py index 7d7c4a4..a0de1d4 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -51,7 +51,7 @@ setup(name="kafkatest", license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.7.1", "requests>=2.5.0"], + install_requires=["ducktape==0.7.5", "requests==2.20.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, )
[kafka] 01/01: Bump version to 2.1.1
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to annotated tag 2.1.1-rc0 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 476a8058a91a0ad73d902a58c81c4b0edfa54ced Author: Colin P. Mccabe AuthorDate: Fri Jan 25 11:59:55 2019 -0800 Bump version to 2.1.1 --- gradle.properties | 2 +- streams/quickstart/java/pom.xml| 2 +- streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gradle.properties b/gradle.properties index 837601a..2abe9d1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py, # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py. -version=2.1.1-SNAPSHOT +version=2.1.1 scalaVersion=2.11.12 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 0b4c235..44754e6 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.1.1-SNAPSHOT +2.1.1 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 06acec9..37a7459 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.1.1-SNAPSHOT +2.1.1 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 4dd3406..7c2d48f 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.1.1-SNAPSHOT +2.1.1 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index eb6e26b..92a8cb8 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.1.1.dev0' +__version__ = '2.1.1'
[kafka] annotated tag 2.1.1-rc0 created (now d046b51)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to annotated tag 2.1.1-rc0 in repository https://gitbox.apache.org/repos/asf/kafka.git. at d046b51 (tag) tagging 476a8058a91a0ad73d902a58c81c4b0edfa54ced (commit) replaces 2.1.0 by Colin P. Mccabe on Fri Jan 25 11:59:55 2019 -0800 - Log - 2.1.1-rc0 --- This annotated tag includes the following new commits: new 476a805 Bump version to 2.1.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[kafka] branch trunk updated: MINOR: In the MetadataResponse schema, ignorable should be a boolean
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1a47feb MINOR: In the MetadataResponse schema, ignorable should be a boolean 1a47feb is described below commit 1a47febd5ba6a58b651482619e7ea1cfe161c066 Author: Gardner Vickers AuthorDate: Fri Jan 25 17:27:35 2019 -0800 MINOR: In the MetadataResponse schema, ignorable should be a boolean --- clients/src/main/resources/common/message/MetadataResponse.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index 0d68c83..1e199df 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -49,7 +49,7 @@ ]}, { "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "about": "The cluster ID that responding broker belongs to." }, -{ "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": "true", +{ "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, "about": "The ID of the controller broker." }, { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+", "about": "Each topic in the response.", "fields": [
[kafka-site] branch asf-site updated: Add Colin McCabe's public key (#184)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new d8cb44c Add Colin McCabe's public key (#184) d8cb44c is described below commit d8cb44cf935e5690cd29c96a40e9a20bff32e5e6 Author: Colin Patrick McCabe AuthorDate: Fri Jan 25 17:30:57 2019 -0800 Add Colin McCabe's public key (#184) --- KEYS | 57 + 1 file changed, 57 insertions(+) diff --git a/KEYS b/KEYS index 9d01873..00bd7ee 100644 --- a/KEYS +++ b/KEYS @@ -1180,3 +1180,60 @@ ox8m5PehrDchoIm0Ff9JPO+K/yMYf21ZVTVRay+ku09DPbDCllq/WSR0lyXbxCLe JuAqBBwRqiZdtf268B55j37n9akx7aSuXU1xQgE= =Z/L8 -END PGP PUBLIC KEY BLOCK- +pub rsa4096 2015-09-04 [SC] + 6B259C61F399AD22D4A4EBC0DE78987A9CD4D9D3 +uid [ unknown] Colin McCabe +sig 3DE78987A9CD4D9D3 2015-09-04 Colin McCabe +sub rsa4096 2015-09-04 [E] +sig DE78987A9CD4D9D3 2015-09-04 Colin McCabe + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBFXpCOwBEACjyZSplW9lbNHce8fxcbcw5DaXz86SiOcINaMHhjfcTm7C7WeA +CZ0B2CFlgwJu6cakmnaixOjGLdh/n35CrL048v1Lb7Ys2HxSFXqlJKgc7as9CNph +lr/aSx+Ryyco1wJ3ALsuMXLQgNpOLiRqyApzXTBsWiMqQkqkSShINVEpECNIJDvY +AfhRP7R/1y4WuOwkk12xTjVnUdZ4oAz4fUfJHKh9gsG5U4dB6kkRaJ2wbGicNQD1 +8+S3V/Mr2LVOOn3evaD7K9UcdCk2wnxhwiNLvAD62glWBV866J8BzypZLaA5nkE9 +PVNQdtkYLz9+ub7iYhF83Ot6vznrkalFgHzuHslp5iJcN7f4nCbUaPATYuG8sLpU +AIixceDN9047T6T2M/gI+aJkmbu6fAdubLdarp//Mq9idkF1toNIg/3KFixXcFKo +2HAn1IK3VqmrQoprYKWC+UyQCigxSiuAgJuWO73fd2G62u/epS24QTbgm9dVtzUK +rdbpbGeiH8wRXR/OVQK0MJDKTn4Y9HJ/K9MP4LrUvfTEiCEYlhgif6esarS2KOAf +JgYrusxLhiaPrlLBJr12f6IH5FKXRaaz2hYajWN9wPzrmIAVX75tD7Z6Jxl8FiA3 +GrLp/9J/spAV71IZIvhb4txsQ4TyR53jvGZmOzQ+T39cwi/lI0izKGFjdwARAQAB +tCFDb2xpbiBNY0NhYmUgPGNtY2NhYmVAYXBhY2hlLm9yZz6JAjcEEwEKACEFAlXp +COwCGwMFCwkIBwMFFQoJCAsFFgIDAQACHgECF4AACgkQ3niYepzU2dMafw/9HPrz +om9UfBG6OgEjseLqMHI1djRFwhc9s4qG5vo2JeC18Amkz2oopsVhe8Ei7DcSW5Xa +azH8BA9hOyoa79ROWNq108vMKx14n2Q1rz1JAmvXGL2Fr/dxVVNtzz5j+h5/RFFb +y2LfnNGufezmOOoa3pd8grT7AuMFTxs6lxGe4t1kL8kwV4kHqvW1zoDb0wzjK5cP +TzanG7KvRRpKi58RGja40FsDYGwk5t2tzV7YyXuWGQySrc5eplrtRlNKNw+1oELC +d16BgTTtfymtQF3oNqOsY1fmKUt3Ov9GdhuKBlCKn+eIpZ2trAVMJYd75BeOnEzc +rc/Cxg3tB1q0vIKiTOuWcNT9K1UIbU1IrZ6FyPucqRXpVdBfxNSkIRyNb4sbcRhy +Le7/tl/IPi3dUBLw5WtshT5Sh2G3xeiCn8XGMkrrzBqRFuZnpGJcEyp7j09mP8bR +JQ8aJ5j/75eCvsQiik90fvLGzsAxRy3zqyBzaT9BcNyITnc45dxAYzQwXJvoLJSQ +HC0Mot9ofTjgI5y5fy5Sq62OoqDU8iL4Z41QG91Xy3RR/QfO/EFHSqcdce+EwO7m +5uJ59eTHdgihb0Afpo6x8+fDUs7BUA4JsntA5bP1UuCgvp15mNw1KMxhZN3K8n5C +02x8gJ6kflU+iLJJFCdecDOf9Onlf7xJWNwkqJa5Ag0EVekI7AEQAOCoQncdWRz+ +23meghCklGh1ObW6CpuEDQvb7WcMI2rHuaYIh1WOkSbsClQxPg6RIjl6YUx8nis0 +n8DAMI97i1o3Efdl8cHBmcDy/sGyZmxquM6yCyM8O9jGIbykwOVcV9ID8ig3F4/V +o+g4COrBQ68jB2b6yEH955QDF2DK15xki9UiYup8ULE/cSjf+voLmb5srLkN9lIN +aouHZcVgBgOPQ1ptFAekepSlnZ7NiIPDXxJe+/8pKYVdlG2S/T+r+R5niLZWkLG0 +BWSjB60IR//1fUnNxxjhcox8WJZT/o1QDiwIH/R77UmpT6hlETrUYN3NZpzHuZW2 +KXOvOZ1qdrKk4br4dzHSsIWsWF4mklyVXHYLOYFd9wRn6eAXGlHdXff/uGFjqmaS +VwDc1qQ8Yc9DQXYU4G4R3gBibXo9Q3wDo07WlguG6v81QNyuAIrk/H/+6YjZep5G +TuB/IVikWoJQYkriB8yzeikj+vh4k42qL2mdotDCJI37mZgOU7xvzH3J8sYRlM1q +HbnZGRezFKnr1JZUnrlqiqa7L2nPCVs8DUpmi5NuMECOGs6LmQ4Adwaqn3FGKSC5 +hReF51V/ceEfdtMkY3ZzCiZ7xJ3ht1ppHckKHnjPLR6Qi8LBWRpuMQzAPGlEBdx3 +PAIgQe5vYY9LkkjRLS7u33ICOHk+T0JXABEBAAGJAh8EGAEKAAkFAlXpCOwCGwwA +CgkQ3niYepzU2dOyDA/+O0ClOeFDxmJMoXUYogGoAjR5sIsAEwqN9hH1Vhrh+icu +SLSWazSSmCNcTGgwHy180VzPc+ZOhDesBtdq7h4HPEPV1ksmkP07kTjwi0bIqeUK +LfGe3ughVnRUtY3dvonZA48kirrOJBqYnAvXrfKxmAup4JFD2v1sy9VMrRM/kGYk +K14lwSfVbWo3916Tst0II9tmf6w1m89IK5/FUAyxjiWyDyt+PRT80BJLL5oU+gWj +9elZLTbAyCroOKSVvgXPIFue8OyG8+GALO89RFegAO/inePV7tJLQ5H1XdSVlFMu +rriJneagLTMk6xUB9yUeEBE9wAU41fONHzfeYxX8JOGaTKArfzqgEwtHx3J7gtFg +gm+FU3WdD9oqC3/dDKZ7pFizUhijUZo+nfedhj+7+ZGPFrPX11TS+N3p2E863DGG +/sGct95FDu5kW+DYty/Jn8IniL6PO25lO2+QzkTxqBc7/tDMEiOKdrst5FkyiBJ/ +4gbL9t5wQBhKd6mvVAW5rdRx3h5v8nRRJiRoBvffCDel88kMaTTdgsYYiP8v7yWB +3ksenxgHlhdFXBBnNIoVpYAwvo5Quj42bUt5nkfTqK2p8tKEBSSa2Cr9JEgNmqc/ +gGDok4uiZw4HdrV8d3/JvahBXfQWH12KeoFmZi/pY/XiPZszEyPrdRVrNbRag/0= +=V7qY +-END PGP PUBLIC KEY BLOCK-
[kafka] branch 2.1 updated: MINOR: Upgrade ducktape to 0.7.5 (#6197)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 3fae5eb MINOR: Upgrade ducktape to 0.7.5 (#6197) 3fae5eb is described below commit 3fae5eb7daf55b9e6e81c7a59640ea1ab4ec5a66 Author: Konstantine Karantasis AuthorDate: Fri Jan 25 11:14:19 2019 -0800 MINOR: Upgrade ducktape to 0.7.5 (#6197) Reviewed-by: Colin P. McCabe (cherry picked from commit 0dbb064963cff16195b21b85ff117d9a91e23c34) --- tests/docker/Dockerfile | 2 +- tests/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index e7961e4..dc08ebe 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -34,7 +34,7 @@ LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev && apt-get -y clean RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.1 +RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.5 # Set up ssh COPY ./ssh-config /root/.ssh/config diff --git a/tests/setup.py b/tests/setup.py index 7d7c4a4..a0de1d4 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -51,7 +51,7 @@ setup(name="kafkatest", license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.7.1", "requests>=2.5.0"], + install_requires=["ducktape==0.7.5", "requests==2.20.0"], tests_require=["pytest", "mock"], cmdclass={'test': PyTest}, )
[kafka] branch 2.1 updated: KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 01fe2d4 KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202) 01fe2d4 is described below commit 01fe2d48eae8b8a47a90ab742d4c0b7fd07e4ce4 Author: Dhruvil Shah AuthorDate: Fri Jan 25 19:40:50 2019 -0800 KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202) Check if a partition is offline while iterating all partitions. Reviewers: Jun Rao (cherry picked from commit 646ec948794c927e4ffa5f96d60b5b9f7fe8f228) --- core/src/main/scala/kafka/server/ReplicaManager.scala | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 84b2d48..efdde13 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -400,9 +400,13 @@ class ReplicaManager(val config: KafkaConfig, def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] = getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after + // the iterator has been constructed could still be returned by this iterator. private def nonOfflinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) + // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the + // iterator has been constructed may not be visible. private def offlinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition) @@ -1339,7 +1343,11 @@ class ReplicaManager(val config: KafkaConfig, private def maybeShrinkIsr(): Unit = { trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR") - nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) + +// Shrink ISRs for non offline partitions +allPartitions.keys.foreach { topicPartition => + nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs)) +} } /**
[kafka] 01/01: Bump version to 2.1.1
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to annotated tag 2.1.1-rc1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 6679800433c329deb7d82fa51e94f8b5f65919b9 Author: Colin P. Mccabe AuthorDate: Sat Jan 26 17:52:41 2019 -0800 Bump version to 2.1.1 --- gradle.properties | 2 +- streams/quickstart/java/pom.xml| 2 +- .../java/src/main/resources/archetype-resources/pom.xml| 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 10 +- tests/kafkatest/version.py | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/gradle.properties b/gradle.properties index 837601a..2abe9d1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py, # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py. -version=2.1.1-SNAPSHOT +version=2.1.1 scalaVersion=2.11.12 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 0b4c235..44754e6 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.1.1-SNAPSHOT +2.1.1 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 06acec9..37a7459 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.1.1-SNAPSHOT +2.1.1 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 4dd3406..7c2d48f 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.1.1-SNAPSHOT +2.1.1 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index eb6e26b..c729d56 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -15,11 +15,11 @@ # This determines the version of kafkatest that can be published to PyPi and installed with pip # -# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT" +# Note that in # due to python version naming restrictions, which are enforced by python packaging tools -# (see https://www.python.org/dev/peps/pep-0440/) +# (see https://www.python.org # -# Instead, in development branches, the version should have a suffix of the form ".devN" +# Instead, in # -# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.1.1.dev0' +# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0 +__version__ = '2.1.1' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 676ab96..629dec0 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -60,7 +60,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("2.1.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("2.1.1-SNAPSHOT") # 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1")
[kafka] annotated tag 2.1.1-rc1 created (now a61df10)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to annotated tag 2.1.1-rc1 in repository https://gitbox.apache.org/repos/asf/kafka.git. at a61df10 (tag) tagging 6679800433c329deb7d82fa51e94f8b5f65919b9 (commit) replaces 2.1.0 by Colin P. Mccabe on Sat Jan 26 17:52:41 2019 -0800 - Log - 2.1.1-rc1 --- This annotated tag includes the following new commits: new 6679800 Bump version to 2.1.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[kafka] branch trunk updated: KAFKA-7859: Use automatic RPC generation in LeaveGroups (#6188)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new dc634f1 KAFKA-7859: Use automatic RPC generation in LeaveGroups (#6188) dc634f1 is described below commit dc634f18f7ea2ef24d202d6a2380365754005b60 Author: Boyang Chen AuthorDate: Thu Jan 31 10:25:18 2019 -0800 KAFKA-7859: Use automatic RPC generation in LeaveGroups (#6188) Reviewed-by: Colin P. McCabe --- .../consumer/internals/AbstractCoordinator.java| 5 +- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java| 2 +- .../requests/ElectPreferredLeadersResponse.java| 5 ++ .../kafka/common/requests/LeaveGroupRequest.java | 87 ++ .../kafka/common/requests/LeaveGroupResponse.java | 63 +--- .../kafka/clients/consumer/KafkaConsumerTest.java | 3 +- .../internals/ConsumerCoordinatorTest.java | 19 ++--- .../kafka/common/requests/RequestResponseTest.java | 6 +- core/src/main/scala/kafka/server/KafkaApis.scala | 13 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 3 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 6 +- 12 files changed, 82 insertions(+), 136 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index c33d017..486e34b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.MemberIdRequiredException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -803,8 +804,8 @@ public abstract class AbstractCoordinator implements Closeable { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. log.info("Sending LeaveGroup request to coordinator {}", coordinator); -LeaveGroupRequest.Builder request = -new LeaveGroupRequest.Builder(groupId, generation.memberId); +LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(new LeaveGroupRequestData() +.setGroupId(groupId).setMemberId(generation.memberId)); client.send(coordinator, request) .compose(new LeaveGroupResponseHandler()); client.pollNoWakeup(); diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 80b118b..13e1483 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.LeaveGroupRequestData; +import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; @@ -77,8 +79,6 @@ import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaderAndIsrRequest; import org.apache.kafka.common.requests.LeaderAndIsrResponse; -import org.apache.kafka.common.requests.LeaveGroupRequest; -import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.ListGroupsRequest; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.ListOffsetRequest; @@ -137,7 +137,7 @@ public enum ApiKeys { FindCoordinatorResponse.schemaVersions()), JOIN_GROUP(11, "JoinGroup", JoinGroupRequest.schemaVersions(), JoinGroupResponse.schemaVersions()), HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()), -LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequest.schemaVersions(), LeaveGroupResponse.schemaVersions()), +LEAVE_GROUP(13, &qu
[kafka] branch trunk updated: MINOR: fix checkstyle suppressions for generated RPC code to work on Windows
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3ebf058 MINOR: fix checkstyle suppressions for generated RPC code to work on Windows 3ebf058 is described below commit 3ebf058123b230d7c2c6686a8c9dd7af1565bd99 Author: Chia-Ping Tsai AuthorDate: Fri Feb 1 02:29:55 2019 +0800 MINOR: fix checkstyle suppressions for generated RPC code to work on Windows Reviewed-by: Colin P. McCabe --- checkstyle/suppressions.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index dd78ba4..f306aaa 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -224,7 +224,7 @@ files="RequestResponseTest.java"/> +files="clients[\\/]src[\\/]generated[\\/].+.java$"/>
[kafka] branch trunk updated: MINOR: Fix some field definitions for ListOffsetReponse (#6214)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2ca54cf MINOR: Fix some field definitions for ListOffsetReponse (#6214) 2ca54cf is described below commit 2ca54cfd33a98d7f2d1834a7fbe55525776efa7e Author: Colin Patrick McCabe AuthorDate: Fri Feb 1 14:21:35 2019 -0800 MINOR: Fix some field definitions for ListOffsetReponse (#6214) Reviewers: Jun Rao --- clients/src/main/resources/common/message/ListOffsetResponse.json | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/resources/common/message/ListOffsetResponse.json b/clients/src/main/resources/common/message/ListOffsetResponse.json index 36935da..9476a19 100644 --- a/clients/src/main/resources/common/message/ListOffsetResponse.json +++ b/clients/src/main/resources/common/message/ListOffsetResponse.json @@ -39,9 +39,10 @@ "about": "The partition error code, or 0 if there was no error." }, { "name": "OldStyleOffsets", "type": "[]int64", "versions": "0", "ignorable": false, "about": "The result offsets." }, -{ "name": "Timestamp", "type": "int64", "versions": "1+" }, -{ "name": "Offset", "type": "int64", "versions": "1+", +{ "name": "Timestamp", "type": "int64", "versions": "1+", "default": "-1", "ignorable": false, "about": "The timestamp associated with the returned offset." }, +{ "name": "Offset", "type": "int64", "versions": "1+", "default": "-1", "ignorable": false, + "about": "The returned offset." }, { "name": "LeaderEpoch", "type": "int32", "versions": "4+" } ]} ]}
[kafka] branch trunk updated: KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e2e8bdb KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972) e2e8bdb is described below commit e2e8bdbd8cb6ca2ac962c72147d21a9e8b9ba2c0 Author: Colin Patrick McCabe AuthorDate: Mon Feb 4 10:39:43 2019 -0800 KAFKA-7832: Use automatic RPC generation in CreateTopics (#5972) Reviewers: Jun Rao , Tom Bentley , Boyang Chen --- .../kafka/clients/admin/KafkaAdminClient.java | 29 +- .../org/apache/kafka/clients/admin/NewTopic.java | 33 ++- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java| 2 +- .../kafka/common/requests/CreateTopicsRequest.java | 315 ++--- .../common/requests/CreateTopicsResponse.java | 100 ++- .../common/message/CreateTopicsRequest.json| 4 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 29 +- .../apache/kafka/common/message/MessageTest.java | 3 +- .../kafka/common/requests/RequestContextTest.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 54 ++-- .../apache/kafka/connect/util/TopicAdminTest.java | 12 +- .../src/main/scala/kafka/server/AdminManager.scala | 97 --- core/src/main/scala/kafka/server/KafkaApis.scala | 89 +++--- .../kafka/api/AuthorizerIntegrationTest.scala | 10 +- .../server/AbstractCreateTopicsRequestTest.scala | 116 ++-- .../kafka/server/CreateTopicsRequestTest.scala | 142 -- .../server/CreateTopicsRequestWithPolicyTest.scala | 78 ++--- .../scala/unit/kafka/server/RequestQuotaTest.scala | 14 +- 19 files changed, 461 insertions(+), 674 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 58baab7..1567d90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -59,6 +59,9 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -1252,7 +1255,7 @@ public class KafkaAdminClient extends AdminClient { public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { final Map> topicFutures = new HashMap<>(newTopics.size()); -final Map topicsMap = new HashMap<>(newTopics.size()); +final CreatableTopicSet topics = new CreatableTopicSet(); for (NewTopic newTopic : newTopics) { if (topicNameIsUnrepresentable(newTopic.name())) { KafkaFutureImpl future = new KafkaFutureImpl<>(); @@ -1261,7 +1264,7 @@ public class KafkaAdminClient extends AdminClient { topicFutures.put(newTopic.name(), future); } else if (!topicFutures.containsKey(newTopic.name())) { topicFutures.put(newTopic.name(), new KafkaFutureImpl<>()); -topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails()); +topics.add(newTopic.convertToCreatableTopic()); } } final long now = time.milliseconds(); @@ -1270,27 +1273,33 @@ public class KafkaAdminClient extends AdminClient { @Override public AbstractRequest.Builder createRequest(int timeoutMs) { -return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly()); +return new CreateTopicsRequest.Builder( +new CreateTopicsRequestData(). +setTopics(topics). +setTimeoutMs(timeoutMs). +setValidateOnly(options.shouldValidateOnly())); } @Override public void handleResponse(AbstractResponse abstractResponse) { CreateTopicsResponse response = (CreateTopicsResponse) abstractResponse; // Check for controller change -for (ApiError error : response.errors().values()) { -if (error.error() == Errors.NOT_CONTROLLER) { +
[kafka] branch 2.1 updated: MINOR: Fix more places where the version should be bumped from 2.1.0 -> 2.1.1 (#6230)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 1a401dd MINOR: Fix more places where the version should be bumped from 2.1.0 -> 2.1.1 (#6230) 1a401dd is described below commit 1a401ddfac49db0e7f0e7d38714e133ebc132031 Author: Colin Patrick McCabe AuthorDate: Tue Feb 5 09:36:22 2019 -0800 MINOR: Fix more places where the version should be bumped from 2.1.0 -> 2.1.1 (#6230) --- docs/js/templateData.js | 2 +- kafka-merge-pr.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/js/templateData.js b/docs/js/templateData.js index eae0b30..f65e2af 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -19,6 +19,6 @@ limitations under the License. var context={ "version": "21", "dotVersion": "2.1", -"fullDotVersion": "2.1.0", +"fullDotVersion": "2.1.1", "scalaVersion": "2.11" }; diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index e6b9a55..925b646 100755 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -70,7 +70,7 @@ TEMP_BRANCH_PREFIX = "PR_TOOL" DEV_BRANCH_NAME = "trunk" -DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.1.0") +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.1.1") def get_json(url): try:
[kafka] branch trunk updated: KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4be68c5 KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219) 4be68c5 is described below commit 4be68c58da6bbd16cb5682dbf1c9b5b1c93500a7 Author: Colin Patrick McCabe AuthorDate: Wed Feb 6 16:42:02 2019 -0800 KAFKA-7828: Add ExternalCommandWorker to Trogdor (#6219) Allow the Trogdor agent to execute external commands. The agent communicates with the external commands via stdin, stdout, and stderr. Based on a patch by Xi Yang Reviewers: David Arthur --- tests/bin/external_trogdor_command_example.py | 38 ++ tests/spec/external_command.json | 33 ++ .../trogdor/workload/ExternalCommandSpec.java | 115 ++ .../trogdor/workload/ExternalCommandWorker.java| 398 + .../workload/ExternalCommandWorkerTest.java| 196 ++ 5 files changed, 780 insertions(+) diff --git a/tests/bin/external_trogdor_command_example.py b/tests/bin/external_trogdor_command_example.py new file mode 100755 index 000..0e53557 --- /dev/null +++ b/tests/bin/external_trogdor_command_example.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import sys +import time + +# +# This is an example of an external script which can be run through Trogdor's +# ExternalCommandWorker. +# + +if __name__ == '__main__': +# Read the ExternalCommandWorker start message. +line = sys.stdin.readline() +start_message = json.loads(line) +workload = start_message["workload"] +print("Starting external_trogdor_command_example with task id %s, workload %s" \ +% (start_message["id"], workload)) +sys.stdout.flush() + `print(json.dumps({"status": "running"}))` +sys.stdout.flush() +time.sleep(0.001 * workload["delayMs"]) + `print(json.dumps({"status": "exiting after %s delayMs" % workload["delayMs"]}))` +sys.stdout.flush() diff --git a/tests/spec/external_command.json b/tests/spec/external_command.json new file mode 100755 index 000..d432938 --- /dev/null +++ b/tests/spec/external_command.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// An example task specification for running an external command in Trogdor. +// Note that this task spec uses a relative path, so make sure you launch +// Trogdor from the project root directory when using it. +// See TROGDOR.md for details. +// + +{ + "class": "org.apache.kafka.trogdor.workload.ExternalCommandSpec", + "command": ["./tests/bin/external_trogdor_command_example.py"], + "durationMs": 1000, + "commandNode": "node0", + "workload":{ +"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", +"message": "Hello, world", +"delayMs": 2000 + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ExternalCommandSpec.java new file mode 100644 index 000..4947aed ---
[kafka] branch 2.1 updated: MINOR: release.py: fix some compatibility problems.
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 70974ff MINOR: release.py: fix some compatibility problems. 70974ff is described below commit 70974fff637e1765e8ce43a0d9884f6ea1e2ec96 Author: Colin P. Mccabe AuthorDate: Mon Feb 4 14:27:50 2019 -0800 MINOR: release.py: fix some compatibility problems. Rather than using sed, use built-in Python regular expressions to strip the SNAPSHOT expression from the pom.xml files. Sed has different flags on different platforms, such as Linux. Using Python directly here is more compatible, as well as being more efficient, and not requiring an rm command afterwards. When running release_notes.py, use the current Python interpreter. This is needed to prevent attempting to run release_notes.py with Python 3 on some systems. release_notes.py will not (yet) work with Python 3. Author: Colin P. Mccabe Reviewers: Magnus Edenhill , David Arthur , Manikumar Reddy Closes #6198 from cmccabe/release_py (cherry picked from commit e942e294f0a700cdc2f94bb7f967c2e9bcb1a271) --- release.py | 29 ++--- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/release.py b/release.py index 802c9de..fe15f7e 100755 --- a/release.py +++ b/release.py @@ -141,6 +141,16 @@ def replace(path, pattern, replacement): for line in updated: f.write(line) +def regexReplace(path, pattern, replacement): +updated = [] +with open(path, 'r') as f: +for line in f: +updated.append(re.sub(pattern, replacement, line)) + +with open(path, 'w') as f: +for line in updated: +f.write(line) + def user_ok(msg): ok = raw_input(msg) return ok.lower() == 'y' @@ -522,12 +532,17 @@ cmd("Checking out current development branch", "git checkout -b %s %s" % (releas print("Updating version numbers") replace("gradle.properties", "version", "version=%s" % release_version) replace("tests/kafkatest/__init__.py", "__version__", "__version__ = '%s'" % release_version) -cmd("update streams quickstart pom", ["sed", "-i", ".orig"," s/-SNAPSHOT//", "streams/quickstart/pom.xml"]) -cmd("update streams quickstart java pom", ["sed", "-i", ".orig", "s/-SNAPSHOT//", "streams/quickstart/java/pom.xml"]) -cmd("update streams quickstart java pom", ["sed", "-i", ".orig", "s/-SNAPSHOT//", "streams/quickstart/java/src/main/resources/archetype-resources/pom.xml"]) -cmd("remove backup pom.xml", "rm streams/quickstart/pom.xml.orig") -cmd("remove backup java pom.xml", "rm streams/quickstart/java/pom.xml.orig") -cmd("remove backup java pom.xml", "rm streams/quickstart/java/src/main/resources/archetype-resources/pom.xml.orig") +print("updating streams quickstart pom") +regexReplace("streams/quickstart/pom.xml", "-SNAPSHOT", "") +print("updating streams quickstart java pom") +regexReplace("streams/quickstart/java/pom.xml", "-SNAPSHOT", "") +print("updating streams quickstart archetype pom") +regexReplace("streams/quickstart/java/src/main/resources/archetype-resources/pom.xml", "-SNAPSHOT", "") +print("updating ducktape version.py") +regexReplace("./tests/kafkatest/version.py", "^DEV_VERSION =.*", +"DEV_VERSION = KafkaVersion(\"%s-SNAPSHOT\")" % release_version) +print("updating ducktape __init__.py") +regexReplace("./tests/kafkatest/__init__.py", ".dev.*", "") # Command in explicit list due to messages with spaces cmd("Commiting version number updates", ["git", "commit", "-a", "-m", "Bump version to %s" % release_version]) # Command in explicit list due to messages with spaces @@ -555,7 +570,7 @@ cmd("Verifying the correct year in NOTICE", "grep %s NOTICE" % current_year, cwd with open(os.path.join(artifacts_dir, "RELEASE_NOTES.html"), 'w') as f: print("Generating release notes") try: -subprocess.check_call(["./release_notes.py", release_version], stdout=f) +subprocess.check_call([sys.executable, "./release_notes.py", release_version], stdout=f) except subprocess.CalledProcessError as e: print_output(e.output)
[kafka] 01/01: Bump version to 2.1.1
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to annotated tag 2.1.1-rc2 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 21234bee31165527859b46ea48c46b76532f7a37 Author: Colin P. Mccabe AuthorDate: Fri Feb 8 10:30:47 2019 -0800 Bump version to 2.1.1 --- gradle.properties | 2 +- streams/quickstart/java/pom.xml| 2 +- .../java/src/main/resources/archetype-resources/pom.xml| 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 10 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/gradle.properties b/gradle.properties index 837601a..2abe9d1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py, # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py. -version=2.1.1-SNAPSHOT +version=2.1.1 scalaVersion=2.11.12 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 0b4c235..44754e6 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.1.1-SNAPSHOT +2.1.1 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 06acec9..37a7459 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.1.1-SNAPSHOT +2.1.1 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 4dd3406..7c2d48f 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.1.1-SNAPSHOT +2.1.1 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index eb6e26b..c729d56 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -15,11 +15,11 @@ # This determines the version of kafkatest that can be published to PyPi and installed with pip # -# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT" +# Note that in # due to python version naming restrictions, which are enforced by python packaging tools -# (see https://www.python.org/dev/peps/pep-0440/) +# (see https://www.python.org # -# Instead, in development branches, the version should have a suffix of the form ".devN" +# Instead, in # -# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.1.1.dev0' +# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0 +__version__ = '2.1.1'
[kafka] annotated tag 2.1.1-rc2 created (now 935a75e)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to annotated tag 2.1.1-rc2 in repository https://gitbox.apache.org/repos/asf/kafka.git. at 935a75e (tag) tagging 21234bee31165527859b46ea48c46b76532f7a37 (commit) replaces 2.1.0 by Colin P. Mccabe on Fri Feb 8 10:30:47 2019 -0800 - Log - 2.1.1-rc2 --- This annotated tag includes the following new commits: new 21234be Bump version to 2.1.1 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[kafka] branch foo created (now 2fb9b03)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch foo in repository https://gitbox.apache.org/repos/asf/kafka.git. at 2fb9b03 MINOR: improve stabilty of ProcessorStateManagerTest (#6240) No new revisions were added by this update.
[kafka] branch foo deleted (was 2fb9b03)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch foo in repository https://gitbox.apache.org/repos/asf/kafka.git. was 2fb9b03 MINOR: improve stabilty of ProcessorStateManagerTest (#6240) The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[kafka] annotated tag 2.1.1 created (now a29fd84)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to annotated tag 2.1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git. at a29fd84 (tag) tagging 21234bee31165527859b46ea48c46b76532f7a37 (commit) replaces 2.1.0 by Colin P. Mccabe on Fri Feb 15 01:04:52 2019 -0800 - Log - Apache Kafka 2.1.1 release --- No new revisions were added by this update.
[kafka] 01/02: Merge tag '2.1.1' into 2.1
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit d69e03e176f49cee91a0322afcda816a6984543e Merge: f15678e 21234be Author: Colin P. Mccabe AuthorDate: Fri Feb 15 01:08:17 2019 -0800 Merge tag '2.1.1' into 2.1 Apache Kafka 2.1.1 release gradle.properties | 2 +- streams/quickstart/java/pom.xml| 2 +- .../java/src/main/resources/archetype-resources/pom.xml| 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 10 +- 5 files changed, 9 insertions(+), 9 deletions(-)
[kafka] branch 2.1 updated (f15678e -> 9729a34)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git. from f15678e MINOR: Make MockClient#poll() more thread-safe (#5942) add 21234be Bump version to 2.1.1 new d69e03e Merge tag '2.1.1' into 2.1 new 9729a34 Update versions to 2.1.2-SNAPSHOT The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docs/js/templateData.js| 2 +- gradle.properties | 2 +- kafka-merge-pr.py | 2 +- streams/quickstart/java/pom.xml| 2 +- .../java/src/main/resources/archetype-resources/pom.xml| 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 10 +- tests/kafkatest/version.py | 2 +- 8 files changed, 12 insertions(+), 12 deletions(-)
[kafka] 02/02: Update versions to 2.1.2-SNAPSHOT
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 9729a34f5050d6365ab0493bb7356a8757e97818 Author: Colin P. Mccabe AuthorDate: Fri Feb 15 01:12:44 2019 -0800 Update versions to 2.1.2-SNAPSHOT --- docs/js/templateData.js| 2 +- gradle.properties | 2 +- kafka-merge-pr.py | 2 +- streams/quickstart/java/pom.xml| 2 +- streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 2 +- tests/kafkatest/version.py | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/js/templateData.js b/docs/js/templateData.js index f65e2af..c457546 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -19,6 +19,6 @@ limitations under the License. var context={ "version": "21", "dotVersion": "2.1", -"fullDotVersion": "2.1.1", +"fullDotVersion": "2.1.2", "scalaVersion": "2.11" }; diff --git a/gradle.properties b/gradle.properties index 2abe9d1..a97dc7b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,7 +19,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py, # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py. -version=2.1.1 +version=2.1.2-SNAPSHOT scalaVersion=2.11.12 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index 925b646..00e8f72 100755 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -70,7 +70,7 @@ TEMP_BRANCH_PREFIX = "PR_TOOL" DEV_BRANCH_NAME = "trunk" -DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.1.1") +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.1.2") def get_json(url): try: diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 44754e6..7c58b84 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.1.1 +2.1.2-SNAPSHOT .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 37a7459..8c8c655 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.1.1 +2.1.2-SNAPSHOT 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 7c2d48f..b6ee257 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.1.1 +2.1.2-SNAPSHOT Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index c729d56..257ef1e 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0 -__version__ = '2.1.1' +__version__ = '2.1.2.dev0' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 629dec0..d880ebb 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -60,7 +60,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("2.1.1-SNAPSHOT") +DEV_VERSION = KafkaVersion("2.1.2-SNAPSHOT") # 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1")
[kafka-site] branch 2.1.1-announce created (now ebfaca5)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch 2.1.1-announce in repository https://gitbox.apache.org/repos/asf/kafka-site.git. at ebfaca5 Update html links for 2.1.1 release This branch includes the following new commits: new ba6c994 KAFKA-2425: Initial upload of Kafka documentation to git repository with intent to replace SVN new f21c50a Add Kafka 0.8.2.2 release to downloads page new 91714d9 Add Harsha as Committer. new e4d9849 HOTFIX: update contributing.html for web docs new 8c4a140 add 0.9.0 docs new e8630d3 KAFKA-2346: typo in api.html new e047c4b update 0.9.0 docs new 1e91e25 minor 0.9.0 doc changes new 2e81256 trivial 0.9.0 doc changes new 0362585 trivial 0.9.0 doc changes for AclCommand new fdd6433 trivial 0.9.0 doc changes to fix the links new 24d8d66 upload latest 0.9.0 docs new dbcd9fd trivial 0.9.0 doc fixes new 0b980bf trivial 0.9.0 doc fixes new 0e79d90 trivial change to 0.9.0 docs to fix incorrect ssl.key.password new 7cf41c3 trivial change: revert incorrect change to ssl.key.password new 56d48ce Add 0.9.0.0 to downloads page new 05d29ab minor fix to link doc to 0.9.0 new 497322d minor fix to link doc to 0.9.0 release note link new 346f93e mino change to add link to quotas section new 53fa90a merge in changes from KAFKA-2881 new 7083337 Update JVM version and tuning settings new 97a7d56 Add Ewen as committer. new d0ddbb4 Minor updates to api, design, security and upgrade pages new 2fb26e0 Update consumer_config, kafka_config and producer_config new 7f47d19 updating docs for 0.9.0.1 release new 1bc5cd9 trivial fix to authorization CLI table new 14ffd37 Add protocol guide new 7b2f7b7 Adding Gwen's keys to KEYS file new 7f95fb8 adding 0.10.0 documentation new 4ed4dd1 fixing misversion in 0.10.0 docs new 46fccbe Docs for 0.10.0.0, release candidate 2 new 87f504b Add Ismael to committers page new fb7c900 Docs for 0.10.0.0, release candidate 3 new 35b3bbb Merge branch 'asf-site' of https://git-wip-us.apache.org/repos/asf/kafka-site into asf-site new 1ad8525 Docs for 0.10.0.0, fourth release candidate new 76217f0 additional improvements to 0.10.0 docs new 1d077ee doc changes included as part of 0.10.0.0-rc5 new f53c519 MINOR: Fix typos and formatting in coding-guide.html new 811faf5 MINOR: Replace style guide link with scala-lang.org link new b91f790 Doc refresh for 0.10.0.0-rc6 and getting the javadoc for Java clients, not the old Java-API in core new af5a933 Update javadoc for 0.10 to include connect and streams new 02bea1d KAFKA-3709: Create a project security page. new 8622aca Rename `project_security.html` to `project-security.html` new d412353 Fix includes in `project-security.html` new 1a18a45 Update docs and javadoc with latest minor changes new de10867 Fix table of contents and minor fix in `block.on.buffer.full` doc new 90c74a3 Update links for Kafka 0.10.0.0 new aafac9a Change site_name from `sna-projects` to `Apache Kafka` new 11d27b0 Fix typo in site_name new 92c15a5 Update 0.10.0 docs from Kafka's 0.10.0 branch new dc14da4 Add Ismael's keys new aaad0d2 Documentation updates from kafka's 0.10.0 branch new 37e8deb Update javadoc for 0.10.0.1 release new 8f55a94 Minor security and streams documentation updates from Kafka's 0.10.0 branch new d5cac74 Add download links for Kafka 0.10.0.1 new 633b9f5 Use archive instead of mirror link for release notes for convenience new 6f9652c Add Jason to committers page new 2cf87df Try to include streams and connect better. Removing messaging centric terminology. Rewrite introduction page. Make a linked doc sections stand alone. new da58207 Fix typo. new 0ba7502 Fix another typo on the homepage. new 5c5b7f8 Add Jason's PGP key new 1b8cdf4 new design new ed0bb0d Add 0.10.1 docs new 2d621da Add 0.10.1 javadocs new 6e3ea2d Merge branch 'derrickdoo-newTheme2' into asf-site new 590fd70 Add new key for Jason new aa684e5 theme enhancements for mobile and branding guidelines new e2348dc fix issue with rewrite rules blocking access to javadocs new 8cac98c Update 0.10.1 docs for RC1 new 72f3f9a Merge formatting fixes into 0.10.1 docs new 234f865 Additional doc fixes for 0.10.1 new 20f2876 update htaccess to load images nested inside of doc folders new b2433ce Resync 0.10.1 documentation new 181e712 Add missing images for 0.10.0 new a6986e0 Resync 0.10.1 docs for RC2 new 230d32e Resync 0.10.1 docs for RC3 new 9bad5a3 Resync 0.10.1 for 0.10.1.0 release new ac2567c En
[kafka-site] branch asf-site updated: Update html links for 2.1.1 release (#186)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 2c8b0f6 Update html links for 2.1.1 release (#186) 2c8b0f6 is described below commit 2c8b0f6a5749a2bc8c368c17fc917e5aecf009b5 Author: Colin Patrick McCabe AuthorDate: Tue Feb 19 15:32:32 2019 -0800 Update html links for 2.1.1 release (#186) Reviewers: Gwen Shapira --- downloads.html | 24 +++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/downloads.html b/downloads.html index e8fd68b..78b2de8 100644 --- a/downloads.html +++ b/downloads.html @@ -5,12 +5,34 @@ Download -2.1.0 is the latest release. The current stable version is 2.1.0. +2.1.1 is the latest release. The current stable version is 2.1.1. You can verify your download by following these https://www.apache.org/info/verification.html";>procedures and using these https://www.apache.org/dist/kafka/KEYS";>KEYS. + +2.1.1 + + +Released Feb 15, 2019 + + +https://www.apache.org/dist/kafka/2.1.1/RELEASE_NOTES.html";>Release Notes + + +Source download: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.1/kafka-2.1.1-src.tgz";>kafka-2.1.1-src.tgz (https://www.apache.org/dist/kafka/2.1.1/kafka-2.1.1-src.tgz.asc";>asc, https://www.apache.org/dist/kafka/2.1.1/kafka-2.1.1-src.tgz.sha512";>sha512) + + +Binary downloads: + +Scala 2.11 Â - https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.1/kafka_2.11-2.1.1.tgz";>kafka_2.11-2.1.1.tgz (https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz.asc";>asc, https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz.sha512";>sha512) +Scala 2.12 Â - https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.1/kafka_2.12-2.1.1.tgz";>kafka_2.12-2.1.1.tgz (https://www.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz.asc";>asc, https://www.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz.sha512";>sha512) + +We build for multiple versions of Scala. This only matters if you are using Scala and you want a version built for the same Scala version you use. Otherwise any version should work (2.12 is recommended). + + + 2.1.0
[kafka] branch trunk updated: Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7f6bf95 Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305) 7f6bf95 is described below commit 7f6bf95c1e4875b1042746e3d73240496073f081 Author: Nicholas Parker AuthorDate: Thu Mar 7 21:07:23 2019 +1300 Fix for KAFKA-7974: Avoid zombie AdminClient when node host isn't resolvable (#6305) * Fix for KAFKA-7974: Avoid calling disconnect() when not connecting * Resolve host only when currentAddress() is called Moves away from automatically resolving the host when the connection entry is constructed, which can leave ClusterConnectionStates in a confused state. Instead, resolution is done on demand, ensuring that the entry in the connection list is present even if the resolution failed. * Add Javadoc to ClusterConnectionStates.connecting() --- .../kafka/clients/ClusterConnectionStates.java | 68 +++--- .../org/apache/kafka/clients/NetworkClient.java| 10 ++-- .../kafka/clients/ClusterConnectionStatesTest.java | 14 ++--- .../apache/kafka/clients/NetworkClientTest.java| 8 ++- 4 files changed, 66 insertions(+), 34 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 376b35d..e9bd971 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -108,16 +109,18 @@ final class ClusterConnectionStates { } /** - * Enter the connecting state for the given connection. + * Enter the connecting state for the given connection, moving to a new resolved address if necessary. * @param id the id of the connection - * @param now the current time - * @throws UnknownHostException + * @param now the current time in ms + * @param host the host of the connection, to be resolved internally if needed + * @param clientDnsLookup the mode of DNS lookup to use when resolving the {@code host} */ -public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) throws UnknownHostException { +public void connecting(String id, long now, String host, ClientDnsLookup clientDnsLookup) { NodeConnectionState connectionState = nodeState.get(id); if (connectionState != null && connectionState.host().equals(host)) { connectionState.lastConnectAttemptMs = now; connectionState.state = ConnectionState.CONNECTING; +// Move to next resolved address, or if addresses are exhausted, mark node to be re-resolved connectionState.moveToNextAddress(); return; } else if (connectionState != null) { @@ -130,14 +133,19 @@ final class ClusterConnectionStates { this.reconnectBackoffInitMs, host, clientDnsLookup)); } -public InetAddress currentAddress(String id) { -return nodeState.get(id).currentAddress(); +/** + * Returns a resolved address for the given connection, resolving it if necessary. + * @param id the id of the connection + * @throws UnknownHostException if the address was not resolvable + */ +public InetAddress currentAddress(String id) throws UnknownHostException { +return nodeState(id).currentAddress(); } /** * Enter the disconnected state for the given node. * @param id the connection we have disconnected - * @param now the current time + * @param now the current time in ms */ public void disconnected(String id, long now) { NodeConnectionState nodeState = nodeState(id); @@ -212,7 +220,7 @@ final class ClusterConnectionStates { /** * Enter the authentication failed state for the given node. * @param id the connection identifier - * @param now the current time + * @param now the current time in ms * @param exception the authentication exception */ public void authenticationFailed(String id, long now, AuthenticationException exception) { @@ -227,7 +235,7 @@ final class ClusterConnectionStates { * Return true if the connection is in the READY state and currently not throttled. * * @param id the connection identifier - * @param now the current time + * @param now the current time in ms */ public boolean isReady(String id, long now) { return isR
[kafka] branch trunk updated: MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f20f3c1 MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445) f20f3c1 is described below commit f20f3c1a97b8a31a2f211cd66506fb823f420c55 Author: Stanislav Kozlovski AuthorDate: Fri Mar 15 18:53:21 2019 +0200 MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (#6445) Reviewers: Colin P. McCabe --- tests/spec/connection_stress_test.json | 29 + .../trogdor/workload/ConnectionStressWorker.java | 38 +- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/tests/spec/connection_stress_test.json b/tests/spec/connection_stress_test.json new file mode 100644 index 000..7b66985 --- /dev/null +++ b/tests/spec/connection_stress_test.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// An example task specification for running a connection stress test in Trogdor. +// See TROGDOR.md for details. +// + +{ + "class": "org.apache.kafka.trogdor.workload.ConnectionStressSpec", + "durationMs": 6, + "clientNode": "node0", + "bootstrapServers": "localhost:9092", + "targetConnectionsPerSec": 100, + "numThreads": 10, + "action": "CONNECT" +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index d85effc..9a9439a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -99,12 +99,14 @@ public class ConnectionStressWorker implements TaskWorker { log.info("{}: Activating ConnectionStressWorker with {}", id, spec); this.doneFuture = doneFuture; this.status = status; -this.totalConnections = 0; -this.totalFailedConnections = 0; -this.startTimeMs = TIME.milliseconds(); +synchronized (ConnectionStressWorker.this) { +this.totalConnections = 0; +this.totalFailedConnections = 0; +this.nextReportTime = 0; +this.startTimeMs = TIME.milliseconds(); +} this.throttle = new ConnectStressThrottle(WorkerUtils. perSecToPerPeriod(spec.targetConnectionsPerSec(), THROTTLE_PERIOD_MS)); -this.nextReportTime = 0; this.workerExecutor = Executors.newFixedThreadPool(spec.numThreads(), ThreadUtils.createThreadFactory("ConnectionStressWorkerThread%d", false)); for (int i = 0; i < spec.numThreads(); i++) { @@ -112,6 +114,17 @@ public class ConnectionStressWorker implements TaskWorker { } } +/** + * Update the worker's status and next status report time. + */ +private synchronized void updateStatus(long lastTimeMs) { +status.update(JsonUtil.JSON_SERDE.valueToTree( +new StatusData(totalConnections, +totalFailedConnections, +(totalConnections * 1000.0) / (lastTimeMs - startTimeMs; +nextReportTime = lastTimeMs + REPORT_INTERVAL_MS; +} + private static class ConnectStressThrottle extends Throttle { ConnectStressThrottle(int maxPerPeriod) { super(maxPerPeriod, THROTTLE_PERIOD_MS); @@ -130,10 +143,7 @@ public class ConnectionStressWorker implements TaskWorker { conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)); ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes()); -while (true) { -if (doneFuture.isDone()) { -
[kafka] branch trunk updated: KAFKA-7858: Automatically generate JoinGroup request/response
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8406f36 KAFKA-7858: Automatically generate JoinGroup request/response 8406f36 is described below commit 8406f3624d8f99b614eb7171b71fae8b0e663dcb Author: Boyang Chen AuthorDate: Mon Mar 18 13:26:09 2019 -0700 KAFKA-7858: Automatically generate JoinGroup request/response Reviewers: Colin P. McCabe --- .../consumer/internals/AbstractCoordinator.java| 34 +-- .../consumer/internals/ConsumerCoordinator.java| 23 +- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java| 2 +- .../kafka/common/requests/JoinGroupRequest.java| 252 - .../kafka/common/requests/JoinGroupResponse.java | 197 ++-- .../common/message/JoinGroupResponse.json | 2 +- .../kafka/clients/consumer/KafkaConsumerTest.java | 37 ++- .../internals/AbstractCoordinatorTest.java | 32 ++- .../internals/ConsumerCoordinatorTest.java | 79 +++ .../kafka/common/requests/RequestResponseTest.java | 56 - .../runtime/distributed/WorkerCoordinator.java | 18 +- .../runtime/distributed/WorkerCoordinatorTest.java | 85 +-- core/src/main/scala/kafka/server/KafkaApis.scala | 63 -- .../kafka/api/AuthorizerIntegrationTest.scala | 20 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 20 +- 16 files changed, 393 insertions(+), 533 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index fd7431b..4ff4e19 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.errors.MemberIdRequiredException; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; @@ -44,7 +46,6 @@ import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.HeartbeatRequest; import org.apache.kafka.common.requests.HeartbeatResponse; import org.apache.kafka.common.requests.JoinGroupRequest; -import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata; import org.apache.kafka.common.requests.JoinGroupResponse; import org.apache.kafka.common.requests.LeaveGroupRequest; import org.apache.kafka.common.requests.LeaveGroupResponse; @@ -86,7 +87,7 @@ import java.util.concurrent.atomic.AtomicReference; * * To leverage this protocol, an implementation must define the format of metadata provided by each * member for group registration in {@link #metadata()} and the format of the state assignment provided - * by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in + * by the leader in {@link #performAssignment(String, String, List)} and becomes available to members in * {@link #onJoinComplete(int, String, String, ByteBuffer)}. * * Note on locking: this class shares state between the caller and a background thread which is @@ -183,7 +184,7 @@ public abstract class AbstractCoordinator implements Closeable { * on the preference). * @return Non-empty map of supported protocols and metadata */ -protected abstract List metadata(); +protected abstract JoinGroupRequestData.JoinGroupRequestProtocolSet metadata(); /** * Invoked prior to each group join or rejoin. This is typically used to perform any @@ -202,7 +203,7 @@ public abstract class AbstractCoordinator implements Closeable { */ protected abstract Map performAssignment(String leaderId, String protocol, - Map allMemberMetadata); + List allMemberMetadata); /** * Invoked when a group member has successfully joined a group. If this call fails with an exception, @@ -476,7 +477,7 @@ public abstract class AbstractCoordinator implements Closeable { /** * Join the group and return the assignment for the next generation. This function handles both
[kafka] branch trunk updated: KAFKA-7819: Improve RoundTripWorker (#6187)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6217178 KAFKA-7819: Improve RoundTripWorker (#6187) 6217178 is described below commit 62171781396b613b6be8e13a2541ab0895b9bb6b Author: Stanislav Kozlovski AuthorDate: Thu Mar 21 19:03:09 2019 +0200 KAFKA-7819: Improve RoundTripWorker (#6187) RoundTripWorker to should use a long field for maxMessages rather than an int. The consumer group used should unique as well. Reviewers: Colin P. McCabe --- .../kafka/trogdor/workload/RoundTripWorker.java| 90 ++ .../trogdor/workload/RoundTripWorkloadSpec.java| 6 +- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index b22292a..d08d807 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -57,11 +57,13 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.TreeSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class RoundTripWorker implements TaskWorker { private static final int THROTTLE_PERIOD_MS = 100; @@ -82,6 +84,10 @@ public class RoundTripWorker implements TaskWorker { private final AtomicBoolean running = new AtomicBoolean(false); +private final Lock lock = new ReentrantLock(); + +private final Condition unackedSendsAreZero = lock.newCondition(); + private ScheduledExecutorService executor; private WorkerStatusTracker status; @@ -92,7 +98,7 @@ public class RoundTripWorker implements TaskWorker { private KafkaConsumer consumer; -private CountDownLatch unackedSends; +private Long unackedSends; private ToSendTracker toSendTracker; @@ -114,7 +120,7 @@ public class RoundTripWorker implements TaskWorker { this.doneFuture = doneFuture; this.producer = null; this.consumer = null; -this.unackedSends = new CountDownLatch(spec.maxMessages()); +this.unackedSends = spec.maxMessages(); executor.submit(new Prepare()); } @@ -157,29 +163,29 @@ public class RoundTripWorker implements TaskWorker { } private static class ToSendTrackerResult { -final int index; +final long index; final boolean firstSend; -ToSendTrackerResult(int index, boolean firstSend) { +ToSendTrackerResult(long index, boolean firstSend) { this.index = index; this.firstSend = firstSend; } } private static class ToSendTracker { -private final int maxMessages; -private final List failed = new ArrayList<>(); -private int frontier = 0; +private final long maxMessages; +private final List failed = new ArrayList<>(); +private long frontier = 0; -ToSendTracker(int maxMessages) { +ToSendTracker(long maxMessages) { this.maxMessages = maxMessages; } -synchronized void addFailed(int index) { +synchronized void addFailed(long index) { failed.add(index); } -synchronized int frontier() { +synchronized long frontier() { return frontier; } @@ -232,7 +238,7 @@ public class RoundTripWorker implements TaskWorker { break; } throttle.increment(); -final int messageIndex = result.index; +final long messageIndex = result.index; if (result.firstSend) { toReceiveTracker.addPending(messageIndex); uniqueMessagesSent++; @@ -248,7 +254,14 @@ public class RoundTripWorker implements TaskWorker { spec.valueGenerator().generate(messageIndex)); producer.send(record, (metadata, exception) -> { if (exception == null) { -unackedSends.countDown(); +try { +lock.lock(); +unackedSends -= 1; +if (unackedSends <= 0) +unackedSendsAreZero.signalAll(); +
[kafka] branch trunk updated: MINOR: fix message protocol help text for ElectPreferredLeadersResult (#6479)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f304667 MINOR: fix message protocol help text for ElectPreferredLeadersResult (#6479) f304667 is described below commit f304667df089b9e39850fe401ddfd0c457b1bb67 Author: Colin Patrick McCabe AuthorDate: Sat Mar 23 12:14:35 2019 -0700 MINOR: fix message protocol help text for ElectPreferredLeadersResult (#6479) Reviewers: Jun Rao --- .../apache/kafka/clients/admin/ElectPreferredLeadersResult.java | 8 +--- .../resources/common/message/ElectPreferredLeadersResponse.json | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java index c76336a..963c5f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java @@ -62,10 +62,12 @@ public class ElectPreferredLeadersResult { result.completeExceptionally(new UnknownTopicOrPartitionException( "Preferred leader election for partition \"" + partition + "\" was not attempted")); +} else if (partitions == null && topicPartitions.isEmpty()) { +// If partitions is null, we requested information about all partitions. In +// that case, if topicPartitions is empty, that indicates a +// CLUSTER_AUTHORIZATION_FAILED error. + result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); } else { -if (partitions == null && topicPartitions.isEmpty()) { - result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); -} ApiException exception = topicPartitions.get(partition).exception(); if (exception == null) { result.complete(null); diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json index f34599c..491bd03 100644 --- a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json +++ b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json @@ -22,7 +22,7 @@ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+", - "about": "The error code, or 0 if there was no error.", "fields": [ + "about": "The election results, or an empty array if the requester did not have permission and the request asks for all partitions.", "fields": [ { "name": "Topic", "type": "string", "versions": "0+", "about": "The topic name" }, { "name": "PartitionResult", "type": "[]PartitionResult", "versions": "0+", @@ -36,4 +36,4 @@ ]} ]} ] -} \ No newline at end of file +}
[kafka] branch trunk updated: KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (#6489)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e0d028b KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (#6489) e0d028b is described below commit e0d028bf6cbf140c72706247c40bded7bfabcb0c Author: Colin Patrick McCabe AuthorDate: Mon Mar 25 09:43:44 2019 -0700 KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (#6489) ToString functions must not get a NullPointException. read() functions must properly translate a negative array length to a null field. Reviewers: Manikumar Reddy --- .../java/org/apache/kafka/common/message/MessageTest.java | 7 +-- .../org/apache/kafka/message/MessageDataGenerator.java| 15 ++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 93a0930..d573b3b 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -38,7 +38,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicSet; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -47,7 +46,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -@Ignore public final class MessageTest { @Rule final public Timeout globalTimeout = Timeout.millis(12); @@ -87,6 +85,11 @@ public final class MessageTest { setHostFilter(null). setOperation((byte) 0). setPermissionType((byte) 0), (short) 0); +testMessageRoundTrips(new MetadataRequestData(). +setTopics(null). +setAllowAutoTopicCreation(false). +setIncludeClusterAuthorizedOperations(false). +setIncludeTopicAuthorizedOperations(false)); } private void testMessageRoundTrips(Message message) throws Exception { diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index 76029f4..c8e70bb 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -416,9 +416,8 @@ public final class MessageDataGenerator { buffer.printf("int arrayLength = readable.readInt();%n"); buffer.printf("if (arrayLength < 0) {%n"); buffer.incrementIndent(); -buffer.printf("this.%s.clear(%s);%n", -field.camelCaseName(), -hasKeys ? "0" : ""); +buffer.printf("this.%s = null;%n", +field.camelCaseName()); buffer.decrementIndent(); buffer.printf("} else {%n"); buffer.incrementIndent(); @@ -1069,8 +1068,14 @@ public final class MessageDataGenerator { prefix, field.camelCaseName(), field.camelCaseName()); } else if (field.type().isArray()) { headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); -buffer.printf("+ \"%s%s=\" + MessageUtil.deepToString(%s.iterator())%n", -prefix, field.camelCaseName(), field.camelCaseName()); +if (field.nullableVersions().empty()) { +buffer.printf("+ \"%s%s=\" + MessageUtil.deepToString(%s.iterator())%n", +prefix, field.camelCaseName(), field.camelCaseName()); +} else { +buffer.printf("+ \"%s%s=\" + ((%s == null) ? \"null\" : " + +"MessageUtil.deepToString(%s.iterator()))%n", +prefix, field.camelCaseName(), field.camelCaseName(), field.camelCaseName()); +} } else { throw new RuntimeException("Unsupported field type " + field.type()); }
[kafka] branch 2.2 updated: KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (#6489)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new f5ba275 KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (#6489) f5ba275 is described below commit f5ba275c03009487cc4c10e099d50564d183862f Author: Colin Patrick McCabe AuthorDate: Mon Mar 25 09:43:44 2019 -0700 KAFKA-8150: Fix bugs in handling null arrays in generated RPC code (#6489) ToString functions must not get a NullPointException. read() functions must properly translate a negative array length to a null field. Reviewers: Manikumar Reddy --- .../org/apache/kafka/message/MessageDataGenerator.java| 15 ++- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java index 1af0ce28..3c21c38 100644 --- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java +++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java @@ -416,9 +416,8 @@ public final class MessageDataGenerator { buffer.printf("int arrayLength = readable.readInt();%n"); buffer.printf("if (arrayLength < 0) {%n"); buffer.incrementIndent(); -buffer.printf("this.%s.clear(%s);%n", -field.camelCaseName(), -hasKeys ? "0" : ""); +buffer.printf("this.%s = null;%n", +field.camelCaseName()); buffer.decrementIndent(); buffer.printf("} else {%n"); buffer.incrementIndent(); @@ -1069,8 +1068,14 @@ public final class MessageDataGenerator { prefix, field.camelCaseName(), field.camelCaseName()); } else if (field.type().isArray()) { headerGenerator.addImport(MessageGenerator.MESSAGE_UTIL_CLASS); -buffer.printf("+ \"%s%s=\" + MessageUtil.deepToString(%s.iterator())%n", -prefix, field.camelCaseName(), field.camelCaseName()); +if (field.nullableVersions().empty()) { +buffer.printf("+ \"%s%s=\" + MessageUtil.deepToString(%s.iterator())%n", +prefix, field.camelCaseName(), field.camelCaseName()); +} else { +buffer.printf("+ \"%s%s=\" + ((%s == null) ? \"null\" : " + +"MessageUtil.deepToString(%s.iterator()))%n", +prefix, field.camelCaseName(), field.camelCaseName(), field.camelCaseName()); +} } else { throw new RuntimeException("Unsupported field type " + field.type()); }
[kafka] branch trunk updated: KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0d55f0f KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444) 0d55f0f is described below commit 0d55f0f3ec8f97bc250b325481f6f2fa70f52a5c Author: Stanislav Kozlovski AuthorDate: Mon Mar 25 09:58:11 2019 -0700 KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444) This patch adds a TimeIntervalTransactionsGenerator class which enables the Trogdor ProduceBench worker to commit transactions based on a configurable millisecond time interval. Also, we now handle 409 create task responses in the coordinator command-line client by printing a more informative message Reviewers: Colin P. McCabe --- TROGDOR.md | 1 + tests/spec/transactional-produce-bench.json| 2 +- .../trogdor/coordinator/CoordinatorClient.java | 12 +++- .../kafka/trogdor/coordinator/TaskManager.java | 1 + .../TimeIntervalTransactionsGenerator.java | 67 ++ .../trogdor/workload/TransactionGenerator.java | 1 + .../TimeIntervalTransactionsGeneratorTest.java | 42 ++ 7 files changed, 123 insertions(+), 3 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index b551773..8b446b3 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -105,6 +105,7 @@ Trogdor can run several workloads. Workloads perform operations on the cluster ### ProduceBench ProduceBench starts a Kafka producer on a single agent node, producing to several partitions. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. +It can be configured to use a transactional producer which can commit transactions based on a set time interval or number of messages. ### RoundTripWorkload RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer. diff --git a/tests/spec/transactional-produce-bench.json b/tests/spec/transactional-produce-bench.json index 40f008b..bf1b6ca 100644 --- a/tests/spec/transactional-produce-bench.json +++ b/tests/spec/transactional-produce-bench.json @@ -15,7 +15,7 @@ // // An example task specification for running a transactional producer benchmark -in Trogdor. See TROGDOR.md for details. +// in Trogdor. See TROGDOR.md for details. // { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 476c32d..ba40e7b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -44,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -426,8 +427,15 @@ public class CoordinatorClient { TaskSpec taskSpec = JsonUtil. objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class); CreateTaskRequest req = new CreateTaskRequest(taskId, taskSpec); -client.createTask(req); -System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id()); +try { +client.createTask(req); +System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id()); +} catch (RequestConflictException rce) { +System.out.printf("CreateTaskRequest for task %s got a 409 status code - " + +"a task with the same ID but a different specification already exists.%nException: %s%n", +req.id(), rce.getMessage()); +Exit.exit(1); +} break; } case "stopTask": { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 941656e..60e2b1e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -302,6 +302,7 @@ public final class TaskManager { * * @param idThe ID of
[kafka] branch trunk updated: MINOR: Fix misspelling in protocol documentation
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1baba1b MINOR: Fix misspelling in protocol documentation 1baba1b is described below commit 1baba1b347a9db51514777dba07263e285e8237b Author: pierDipi <33736985+pierd...@users.noreply.github.com> AuthorDate: Mon Mar 25 17:54:17 2019 +0100 MINOR: Fix misspelling in protocol documentation Reviewers: Colin P. McCabe --- .../src/main/java/org/apache/kafka/common/protocol/CommonFields.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java index 708500c..5fdc37f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java @@ -51,7 +51,7 @@ public class CommonFields { public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type"); public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name"); public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter"); -public static final Field.Int8 RESOURCE_PATTERN_TYPE = new Field.Int8("resource_pattten_type", "The resource pattern type", PatternType.LITERAL.code()); +public static final Field.Int8 RESOURCE_PATTERN_TYPE = new Field.Int8("resource_pattern_type", "The resource pattern type", PatternType.LITERAL.code()); public static final Field.Int8 RESOURCE_PATTERN_TYPE_FILTER = new Field.Int8("resource_pattern_type_filter", "The resource pattern type filter", PatternType.LITERAL.code()); public static final Field.Str PRINCIPAL = new Field.Str("principal", "The ACL principal"); public static final Field.NullableStr PRINCIPAL_FILTER = new Field.NullableStr("principal", "The ACL principal filter");
[kafka] branch trunk updated: MINOR: WorkerUtils#abort: fix bug in abort logic (#6516)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b25974c MINOR: WorkerUtils#abort: fix bug in abort logic (#6516) b25974c is described below commit b25974c387571b77fbe1e5010dbdd3f40a60a61d Author: Colin Patrick McCabe AuthorDate: Thu Mar 28 14:47:37 2019 -0700 MINOR: WorkerUtils#abort: fix bug in abort logic (#6516) doneFuture is supposed to be completed with an empty string (meaning success) or a non-empty string which is the error message. Currently, due to exception.getMessage sometimes returning null or an empty string, this is not working correctly. This patch fixes that. Reviewers: David Arthur --- .../main/java/org/apache/kafka/trogdor/common/WorkerUtils.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index ef6e275..ad54c06 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -62,8 +62,12 @@ public final class WorkerUtils { */ public static void abort(Logger log, String what, Throwable exception, KafkaFutureImpl doneFuture) throws KafkaException { -log.warn("{} caught an exception: ", what, exception); -doneFuture.complete(exception.getMessage()); +log.warn("{} caught an exception", what, exception); +if (exception.getMessage() == null || exception.getMessage().isEmpty()) { +doneFuture.complete(exception.getClass().getCanonicalName()); +} else { +doneFuture.complete(exception.getMessage()); +} throw new KafkaException(exception); }
[kafka] branch trunk updated: KAFKA-8034: Use automatic RPC generation in DeleteTopics
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 981815c KAFKA-8034: Use automatic RPC generation in DeleteTopics 981815c is described below commit 981815c8d14daf1042e06f8fa9cb355187719b1d Author: Mickael Maison AuthorDate: Fri Mar 29 21:32:36 2019 + KAFKA-8034: Use automatic RPC generation in DeleteTopics Reviewers: Colin P. McCabe --- .../kafka/clients/admin/KafkaAdminClient.java | 16 ++-- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java| 2 +- .../kafka/common/requests/DeleteTopicsRequest.java | 86 +++- .../common/requests/DeleteTopicsResponse.java | 93 +- .../common/message/DeleteTopicsResponse.json | 6 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 16 +++- .../kafka/common/requests/RequestResponseTest.java | 20 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 75 ++--- .../kafka/api/AuthorizerIntegrationTest.scala | 18 +++-- .../kafka/server/DeleteTopicsRequestTest.scala | 55 - ...leteTopicsRequestWithDeletionDisabledTest.scala | 17 ++-- .../scala/unit/kafka/server/RequestQuotaTest.scala | 10 ++- 13 files changed, 206 insertions(+), 214 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 317cd7c..336597f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -63,6 +63,8 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; @@ -1365,14 +1367,16 @@ public class KafkaAdminClient extends AdminClient { @Override AbstractRequest.Builder createRequest(int timeoutMs) { -return new DeleteTopicsRequest.Builder(new HashSet<>(validTopicNames), timeoutMs); +return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() +.setTopicNames(validTopicNames) +.setTimeoutMs(timeoutMs)); } @Override void handleResponse(AbstractResponse abstractResponse) { DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; // Check for controller change -for (Errors error : response.errors().values()) { +for (Errors error : response.errorCounts().keySet()) { if (error == Errors.NOT_CONTROLLER) { metadataManager.clearController(); metadataManager.requestUpdate(); @@ -1380,12 +1384,12 @@ public class KafkaAdminClient extends AdminClient { } } // Handle server responses for particular topics. -for (Map.Entry entry : response.errors().entrySet()) { -KafkaFutureImpl future = topicFutures.get(entry.getKey()); +for (DeletableTopicResult result : response.data().responses()) { +KafkaFutureImpl future = topicFutures.get(result.name()); if (future == null) { -log.warn("Server response mentioned unknown topic {}", entry.getKey()); +log.warn("Server response mentioned unknown topic {}", result.name()); } else { -ApiException exception = entry.getValue().exception(); +ApiException exception = Errors.forCode(result.errorCode()).exception(); if (exception != null) { future.completeExceptionally(exception); } else { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3f8d80d..ed9787f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/ap
[kafka] branch trunk updated: KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4825bc4 KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532) 4825bc4 is described below commit 4825bc47a0341076009bdcb15d9a195f206a1f32 Author: Stanislav Kozlovski AuthorDate: Tue Apr 2 17:09:40 2019 -0700 KAFKA-8183: Add retries to WorkerUtils#verifyTopics (#6532) Reviewers: Colin P. McCabe --- .../apache/kafka/trogdor/common/WorkerUtils.java | 34 ++ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java index ad54c06..adce304 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java @@ -44,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -160,7 +161,7 @@ public final class WorkerUtils { log.warn("Topic(s) {} already exist.", topicsExists); throw new TopicExistsException("One or more topics already exist."); } else { -verifyTopics(log, adminClient, topicsExists, topics); +verifyTopics(log, adminClient, topicsExists, topics, 3, 2500); } } } @@ -240,17 +241,20 @@ public final class WorkerUtils { * @param topicsToVerify List of topics to verify * @param topicsInfo Map of topic name to topic description, which includes topics in * 'topicsToVerify' list. + * @param retryCount The number of times to retry the fetching of the topics + * @param retryBackoffMs The amount of time, in milliseconds, to wait in between retries * @throws UnknownTopicOrPartitionException If at least one topic contained in 'topicsInfo' - * does not exist + * does not exist after retrying. * @throws RuntimeException If one or more topics have different number of partitions than * described in 'topicsInfo' */ private static void verifyTopics( Logger log, AdminClient adminClient, -Collection topicsToVerify, Map topicsInfo) throws Throwable { -DescribeTopicsResult topicsResult = adminClient.describeTopics( -topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); -Map topicDescriptionMap = topicsResult.all().get(); +Collection topicsToVerify, Map topicsInfo, int retryCount, long retryBackoffMs) throws Throwable { + +Map topicDescriptionMap = topicDescriptions(topicsToVerify, adminClient, +retryCount, retryBackoffMs); + for (TopicDescription desc: topicDescriptionMap.values()) { // map will always contain the topic since all topics in 'topicsExists' are in given // 'topics' map @@ -265,6 +269,24 @@ public final class WorkerUtils { } } +private static Map topicDescriptions(Collection topicsToVerify, + AdminClient adminClient, + int retryCount, long retryBackoffMs) +throws ExecutionException, InterruptedException { +UnknownTopicOrPartitionException lastException = null; +for (int i = 0; i < retryCount; i++) { +try { +DescribeTopicsResult topicsResult = adminClient.describeTopics( +topicsToVerify, new DescribeTopicsOptions().timeoutMs(ADMIN_REQUEST_TIMEOUT)); +return topicsResult.all().get(); +} catch (UnknownTopicOrPartitionException exception) { +lastException = exception; +Thread.sleep(retryBackoffMs); +} +} +throw lastException; +} + /** * Returns list of existing, not internal, topics/partitions that match given pattern and * where partitions are in range [startPartition, endPartition]
[kafka] branch trunk updated: MINOR: ConnectionStressWorker: add missing executor shutdown (#6558)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 04a023e MINOR: ConnectionStressWorker: add missing executor shutdown (#6558) 04a023e is described below commit 04a023e302d1c02c32bba1653303bc0d40140f18 Author: Colin Patrick McCabe AuthorDate: Thu Apr 11 11:16:06 2019 -0700 MINOR: ConnectionStressWorker: add missing executor shutdown (#6558) --- .../java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index e9bbf07..5c5db48 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -309,6 +309,7 @@ public class ConnectionStressWorker implements TaskWorker { // Otherwise, if some threads take a while to terminate, this could lead // to a misleading rate getting reported. this.statusUpdaterFuture.cancel(false); +this.statusUpdaterExecutor.shutdown(); this.statusUpdaterExecutor.awaitTermination(1, TimeUnit.DAYS); this.statusUpdaterExecutor = null; new StatusUpdater().run();
[kafka] branch trunk updated: KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3b1524c KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247) 3b1524c is described below commit 3b1524c5dfd2a94f3fb919dad0de70984963772b Author: Manikumar Reddy AuthorDate: Wed Apr 17 04:56:33 2019 +0530 KAFKA-7466: Add IncrementalAlterConfigs API (KIP-339) (#6247) Reviewers: Colin P. McCabe , Viktor Somogyi , Stanislav Kozlovski , Rajini Sivaram , Ismael Juma --- checkstyle/suppressions.xml| 2 +- .../apache/kafka/clients/admin/AdminClient.java| 46 ++ .../apache/kafka/clients/admin/AlterConfigOp.java | 96 +++ .../kafka/clients/admin/KafkaAdminClient.java | 90 +++ .../org/apache/kafka/common/config/ConfigDef.java | 4 + .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java| 2 + .../requests/IncrementalAlterConfigsRequest.java | 91 +++ .../requests/IncrementalAlterConfigsResponse.java | 99 .../message/IncrementalAlterConfigsRequest.json| 41 + .../message/IncrementalAlterConfigsResponse.json | 36 + .../kafka/clients/admin/KafkaAdminClientTest.java | 56 +++ .../kafka/clients/admin/MockAdminClient.java | 7 + .../kafka/common/requests/RequestResponseTest.java | 35 core/src/main/scala/kafka/log/LogConfig.scala | 2 + .../src/main/scala/kafka/server/AdminManager.scala | 180 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 31 .../kafka/api/AdminClientIntegrationTest.scala | 178 .../kafka/api/AuthorizerIntegrationTest.scala | 33 +++- .../server/DynamicBrokerReconfigurationTest.scala | 16 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 10 +- .../test/scala/unit/kafka/utils/TestUtils.scala| 28 +++- 23 files changed, 1023 insertions(+), 68 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ce2706d..ca103a2 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -43,7 +43,7 @@ files="Sender.java"/> + files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|AdminClient|KafkaAdminClient).java"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index b823cdc..8826f83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -368,7 +368,9 @@ public abstract class AdminClient implements AutoCloseable { * @param configs The resources with their configs (topic is the only resource type with configs that can *be updated currently) * @returnThe AlterConfigsResult + * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map)}. */ +@Deprecated public AlterConfigsResult alterConfigs(Map configs) { return alterConfigs(configs, new AlterConfigsOptions()); } @@ -385,10 +387,54 @@ public abstract class AdminClient implements AutoCloseable { *be updated currently) * @param options The options to use when describing configs * @returnThe AlterConfigsResult + * @deprecated Since 2.3. Use {@link #incrementalAlterConfigs(Map, AlterConfigsOptions)}. */ +@Deprecated public abstract AlterConfigsResult alterConfigs(Map configs, AlterConfigsOptions options); /** + * Incrementally updates the configuration for the specified resources with default options. + * + * This is a convenience method for #{@link AdminClient#incrementalAlterConfigs(Map, AlterConfigsOptions)} with default options. + * See the overload for more details.* + * + * This operation is supported by brokers with version 2.3.0 or higher. + * + * @param configs The resources with their configs + * @returnThe IncrementalAlterConfigsResult + */ +public AlterConfigsResult incrementalAlterConfigs(Map> configs) { +return incrementalAlterConfigs(configs, new AlterConfigsOptions()); +} + + +/** + * Incrementally update the configuration for the specified resources. + * + * Updates are not transactional so they may succeed for some resources while fail for others. The configs for + * a particular resource are updated atomically. + * + * The following exceptions ca
[kafka] branch trunk updated: MINOR: Remove errant lock.unlock() call from RoundTripWorker (#6612)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 964e90a MINOR: Remove errant lock.unlock() call from RoundTripWorker (#6612) 964e90a is described below commit 964e90a725f7a0bcaf03735f81e525388868b4d9 Author: Bob Barrett AuthorDate: Sat Apr 20 13:43:35 2019 -0400 MINOR: Remove errant lock.unlock() call from RoundTripWorker (#6612) Reviewers: Colin P. McCabe --- .../main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java| 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index d08d807..7eab2de 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -385,8 +385,6 @@ public class RoundTripWorker implements TaskWorker { log.debug("{}: Consumer got WakeupException", id, e); } catch (TimeoutException e) { log.debug("{}: Consumer got TimeoutException", id, e); -} finally { -lock.unlock(); } } } catch (Throwable e) {
[kafka] branch trunk updated: MINOR: log which signals are handled on startup (#6620)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c2bee98 MINOR: log which signals are handled on startup (#6620) c2bee98 is described below commit c2bee988faa3c338fdc275aac9f638bb652b38a0 Author: Colin Patrick McCabe AuthorDate: Mon Apr 22 13:08:27 2019 -0700 MINOR: log which signals are handled on startup (#6620) Reviewers: Gwen Shapira --- .../org/apache/kafka/common/utils/LoggingSignalHandler.java | 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java index fbe6736..112d7fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/LoggingSignalHandler.java @@ -23,6 +23,8 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,6 +32,8 @@ public class LoggingSignalHandler { private static final Logger log = LoggerFactory.getLogger(LoggingSignalHandler.class); +private static final List SIGNALS = Arrays.asList("TERM", "INT", "HUP"); + private final Constructor signalConstructor; private final Class signalHandlerClass; private final Class signalClass; @@ -61,9 +65,11 @@ public class LoggingSignalHandler { */ public void register() throws ReflectiveOperationException { Map jvmSignalHandlers = new ConcurrentHashMap<>(); -register("TERM", jvmSignalHandlers); -register("INT", jvmSignalHandlers); -register("HUP", jvmSignalHandlers); + +for (String signal : SIGNALS) { +register(signal, jvmSignalHandlers); +} +log.info("Registered signal handlers for " + String.join(", ", SIGNALS)); } private Object createSignalHandler(final Map jvmSignalHandlers) {
[kafka] branch trunk updated: MINOR: reformat settings.gradle to be more readable (#6621)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9e10916 MINOR: reformat settings.gradle to be more readable (#6621) 9e10916 is described below commit 9e10916abda2f2a3911b8ff50ad7f76bf1f45030 Author: Colin Patrick McCabe AuthorDate: Tue Apr 23 16:16:25 2019 -0700 MINOR: reformat settings.gradle to be more readable (#6621) Reviewers: Jason Gustafson --- build.gradle| 21 +++-- settings.gradle | 30 +- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index ba674e6..776af9d 100644 --- a/build.gradle +++ b/build.gradle @@ -604,8 +604,25 @@ for ( sv in availableScalaVersions ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file', 'connect:basic-auth-extension'] -def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs +def connectPkgs = [ +'connect:api', +'connect:basic-auth-extension', +'connect:file', +'connect:json', +'connect:runtime', +'connect:transforms' +] + +def pkgs = [ +'clients', +'examples', +'log4j-appender', +'streams', +'streams:examples', +'streams:streams-scala', +'streams:test-utils', +'tools' +] + connectPkgs /** Create one task per default Scala version */ def withDefScalaVersions(taskName) { diff --git a/settings.gradle b/settings.gradle index 5ae78e4..b2814b1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,8 +13,28 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples', -'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', -'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', 'streams:upgrade-system-tests-20', -'streams:upgrade-system-tests-21' , 'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', -'connect:basic-auth-extension', 'jmh-benchmarks', 'generator' +include 'clients', +'connect:api', +'connect:basic-auth-extension', +'connect:file', +'connect:json', +'connect:runtime', +'connect:transforms', +'core', +'examples', +'generator', +'jmh-benchmarks', +'log4j-appender', +'streams', +'streams:examples', +'streams:streams-scala', +'streams:test-utils', +'streams:upgrade-system-tests-0100', +'streams:upgrade-system-tests-0101', +'streams:upgrade-system-tests-0102', +'streams:upgrade-system-tests-0110', +'streams:upgrade-system-tests-10', +'streams:upgrade-system-tests-11', +'streams:upgrade-system-tests-20', +'streams:upgrade-system-tests-21', +'tools'
[kafka] branch trunk updated: KAFKA-7903: automatically generate OffsetCommitRequest (#6583)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0c2d829 KAFKA-7903: automatically generate OffsetCommitRequest (#6583) 0c2d829 is described below commit 0c2d829249567c6a87140ae25f631a2abad11d00 Author: Boyang Chen AuthorDate: Thu Apr 25 16:42:54 2019 -0700 KAFKA-7903: automatically generate OffsetCommitRequest (#6583) Reviewers: Colin P. McCabe --- .../consumer/internals/ConsumerCoordinator.java| 121 +--- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/OffsetCommitRequest.java | 339 - .../common/requests/OffsetCommitResponse.java | 143 - .../kafka/clients/consumer/KafkaConsumerTest.java | 5 +- .../internals/ConsumerCoordinatorTest.java | 48 ++- .../kafka/common/requests/RequestResponseTest.java | 44 ++- core/src/main/scala/kafka/server/KafkaApis.scala | 66 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 25 +- .../scala/unit/kafka/server/KafkaApisTest.scala| 25 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 34 ++- 11 files changed, 366 insertions(+), 490 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 68e98ba..4d40070 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -42,6 +44,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; @@ -770,14 +773,27 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return RequestFuture.coordinatorNotAvailable(); // create the offset commit request -Map offsetData = new HashMap<>(offsets.size()); +Map requestTopicDataMap = new HashMap<>(); for (Map.Entry entry : offsets.entrySet()) { +TopicPartition topicPartition = entry.getKey(); OffsetAndMetadata offsetAndMetadata = entry.getValue(); if (offsetAndMetadata.offset() < 0) { return RequestFuture.failure(new IllegalArgumentException("Invalid offset: " + offsetAndMetadata.offset())); } -offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(), -offsetAndMetadata.leaderEpoch(), offsetAndMetadata.metadata())); + +OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap +.getOrDefault(topicPartition.topic(), +new OffsetCommitRequestData.OffsetCommitRequestTopic() +.setName(topicPartition.topic()) +); + +topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition() +.setPartitionIndex(topicPartition.partition()) +.setCommittedOffset(offsetAndMetadata.offset()) + .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH)) +.setCommittedMetadata(offsetAndMetadata.metadata()) +); +requestTopicDataMap.put(topicPartition.topic(), topic); } final Generation generation; @@ -791,9 +807,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (generation == null) return RequestFuture.failure(new CommitFailedException()); -OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData). -setGenerationId(generation.generationId). -setMem
[kafka] branch trunk updated: KAFKA-8134: `linger.ms` must be a long
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b4532a6 KAFKA-8134: `linger.ms` must be a long b4532a6 is described below commit b4532a65f758448c763b65b2fdde1405db2f9d9d Author: Dhruvil Shah AuthorDate: Mon Apr 29 08:59:18 2019 -0700 KAFKA-8134: `linger.ms` must be a long Reviewers: Ismael Juma , Colin P. McCabe --- .../kafka/clients/producer/KafkaProducer.java | 13 -- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../producer/internals/RecordAccumulator.java | 8 ++-- .../producer/internals/RecordAccumulatorTest.java | 51 ++ .../clients/producer/internals/SenderTest.java | 8 ++-- .../producer/internals/TransactionManagerTest.java | 4 +- .../kafka/api/BaseProducerSendTest.scala | 13 +++--- .../kafka/api/PlaintextProducerSendTest.scala | 6 ++- .../scala/unit/kafka/server/FetchRequestTest.scala | 1 + 9 files changed, 57 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 83a4d51..b83d98e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -395,7 +395,7 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, -config.getInt(ProducerConfig.LINGER_MS_CONFIG), +lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, @@ -475,12 +475,17 @@ public class KafkaProducer implements Producer { apiVersions); } +private static int lingerMs(ProducerConfig config) { +return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE); +} + private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); -int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); +int lingerMs = lingerMs(config); int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); +int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE); -if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) { +if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) { if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) { // throw an exception if the user explicitly set an inconsistent value throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG @@ -488,7 +493,7 @@ public class KafkaProducer implements Producer { + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } else { // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility -deliveryTimeoutMs = lingerMs + requestTimeoutMs; +deliveryTimeoutMs = lingerAndRequestTimeoutMs; log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 9324b9e..758f858 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) -.define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) +.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
[kafka] branch 2.2 updated: KAFKA-8134: `linger.ms` must be a long
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.2 by this push: new 43e6504 KAFKA-8134: `linger.ms` must be a long 43e6504 is described below commit 43e650469412afd93faf18f176881a0b6f4d0716 Author: Dhruvil Shah AuthorDate: Mon Apr 29 08:59:18 2019 -0700 KAFKA-8134: `linger.ms` must be a long Reviewers: Ismael Juma , Colin P. McCabe --- .../kafka/clients/producer/KafkaProducer.java | 13 -- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../producer/internals/RecordAccumulator.java | 8 ++-- .../producer/internals/RecordAccumulatorTest.java | 51 ++ .../clients/producer/internals/SenderTest.java | 8 ++-- .../producer/internals/TransactionManagerTest.java | 4 +- .../kafka/api/BaseProducerSendTest.scala | 13 +++--- .../kafka/api/PlaintextProducerSendTest.scala | 6 ++- .../scala/unit/kafka/server/FetchRequestTest.scala | 1 + 9 files changed, 57 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index bfd6bf3..8b99a9a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -396,7 +396,7 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, -config.getInt(ProducerConfig.LINGER_MS_CONFIG), +lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, @@ -473,12 +473,17 @@ public class KafkaProducer implements Producer { apiVersions); } +private static int lingerMs(ProducerConfig config) { +return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE); +} + private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); -int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); +int lingerMs = lingerMs(config); int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); +int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE); -if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) { +if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) { if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) { // throw an exception if the user explicitly set an inconsistent value throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG @@ -486,7 +491,7 @@ public class KafkaProducer implements Producer { + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } else { // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility -deliveryTimeoutMs = lingerMs + requestTimeoutMs; +deliveryTimeoutMs = lingerAndRequestTimeoutMs; log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index c63477d..19f0ce9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) -.define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) +.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
[kafka] branch 2.1 updated: KAFKA-8134: `linger.ms` must be a long
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.1 by this push: new 18de41a KAFKA-8134: `linger.ms` must be a long 18de41a is described below commit 18de41a2907d7f50fb291a7e57c300f5d94c167b Author: Dhruvil Shah AuthorDate: Mon Apr 29 08:59:18 2019 -0700 KAFKA-8134: `linger.ms` must be a long Reviewers: Ismael Juma , Colin P. McCabe --- .../kafka/clients/producer/KafkaProducer.java | 13 -- .../kafka/clients/producer/ProducerConfig.java | 2 +- .../producer/internals/RecordAccumulator.java | 8 ++-- .../producer/internals/RecordAccumulatorTest.java | 51 ++ .../clients/producer/internals/SenderTest.java | 14 +++--- .../producer/internals/TransactionManagerTest.java | 4 +- .../kafka/api/BaseProducerSendTest.scala | 13 +++--- .../kafka/api/PlaintextProducerSendTest.scala | 6 ++- .../scala/unit/kafka/server/FetchRequestTest.scala | 1 + 9 files changed, 60 insertions(+), 52 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b2bccb7..8528bba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -395,7 +395,7 @@ public class KafkaProducer implements Producer { this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, -config.getInt(ProducerConfig.LINGER_MS_CONFIG), +lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, @@ -472,12 +472,17 @@ public class KafkaProducer implements Producer { apiVersions); } +private static int lingerMs(ProducerConfig config) { +return (int) Math.min(config.getLong(ProducerConfig.LINGER_MS_CONFIG), Integer.MAX_VALUE); +} + private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { int deliveryTimeoutMs = config.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG); -int lingerMs = config.getInt(ProducerConfig.LINGER_MS_CONFIG); +int lingerMs = lingerMs(config); int requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); +int lingerAndRequestTimeoutMs = (int) Math.min((long) lingerMs + requestTimeoutMs, Integer.MAX_VALUE); -if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerMs + requestTimeoutMs) { +if (deliveryTimeoutMs < Integer.MAX_VALUE && deliveryTimeoutMs < lingerAndRequestTimeoutMs) { if (config.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) { // throw an exception if the user explicitly set an inconsistent value throw new ConfigException(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG @@ -485,7 +490,7 @@ public class KafkaProducer implements Producer { + " + " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } else { // override deliveryTimeoutMs default value to lingerMs + requestTimeoutMs for backward compatibility -deliveryTimeoutMs = lingerMs + requestTimeoutMs; +deliveryTimeoutMs = lingerAndRequestTimeoutMs; log.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, deliveryTimeoutMs); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index c63477d..19f0ce9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -260,7 +260,7 @@ public class ProducerConfig extends AbstractConfig { ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC) .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) -.define(LINGER_MS_CONFIG, Type.INT, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC) +.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0), Importance.MEDIUM, LINGER_MS_DOC)
[kafka] branch trunk updated: KAFKA-7992: Introduce start-time-ms metric (#6318)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 191f2fa KAFKA-7992: Introduce start-time-ms metric (#6318) 191f2fa is described below commit 191f2faae07b6608b0601dc2caf204b196a8fc47 Author: Stanislav Kozlovski AuthorDate: Wed May 1 16:58:02 2019 +0100 KAFKA-7992: Introduce start-time-ms metric (#6318) Reviewers: Colin P. McCabe , Ismael Juma --- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../apache/kafka/common/utils/AppInfoParser.java | 32 +--- .../kafka/common/utils/AppInfoParserTest.java | 88 ++ .../kafka/connect/runtime/ConnectMetrics.java | 2 +- .../runtime/distributed/WorkerGroupMember.java | 2 +- core/src/main/scala/kafka/common/AppInfo.scala | 54 - core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- 9 files changed, 117 insertions(+), 69 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 23d7fd5..ffe24ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -451,7 +451,7 @@ public class KafkaAdminClient extends AdminClient { this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG); this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG); config.logUnused(); -AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); +AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka admin client initialized"); thread.start(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c73a028..a59e857 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -811,7 +811,7 @@ public class KafkaConsumer implements Consumer { isolationLevel); config.logUnused(); -AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); +AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka consumer initialized"); } catch (Throwable t) { // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b83d98e..06a0fc1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -423,7 +423,7 @@ public class KafkaProducer implements Producer { this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); config.logUnused(); -AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); +AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java index 8a12fbc..3ceca99 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java @@ -36,6 +36,8 @@ public class AppInfoParser { private static final String VERSION; private static final String COMMIT_ID; +protected static final String DEFAULT_VALUE = "unknown"; + static { Properties props = new Properties(); try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka/kafka-version.properties")) { @@ -43,8 +45,8 @@ public class AppInfoParser { } catch (Exception e) { log.warn("Error while loading kafka-version.properties: {}", e.getMessage()); } -VERSION = props.getProperty("version", "unknown").trim(); -
[kafka] branch trunk updated: KAFKA-8158: Add EntityType for Kafka RPC fields (#6503)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5144660 KAFKA-8158: Add EntityType for Kafka RPC fields (#6503) 5144660 is described below commit 5144660040839cee6e213b5146c1ae44340eddb0 Author: Colin Patrick McCabe AuthorDate: Tue May 7 21:35:17 2019 -0700 KAFKA-8158: Add EntityType for Kafka RPC fields (#6503) Reviewers: Jason Gustafson --- .../common/message/AddOffsetsToTxnRequest.json | 6 +- .../common/message/AddPartitionsToTxnRequest.json | 6 +- .../common/message/AddPartitionsToTxnResponse.json | 2 +- .../common/message/AlterReplicaLogDirsRequest.json | 2 +- .../message/AlterReplicaLogDirsResponse.json | 2 +- .../common/message/ControlledShutdownRequest.json | 2 +- .../common/message/ControlledShutdownResponse.json | 2 +- .../common/message/CreatePartitionsRequest.json| 4 +- .../common/message/CreatePartitionsResponse.json | 2 +- .../common/message/CreateTopicsRequest.json| 4 +- .../common/message/CreateTopicsResponse.json | 2 +- .../common/message/DeleteGroupsRequest.json| 2 +- .../common/message/DeleteGroupsResponse.json | 2 +- .../common/message/DeleteRecordsRequest.json | 2 +- .../common/message/DeleteRecordsResponse.json | 2 +- .../common/message/DeleteTopicsRequest.json| 2 +- .../common/message/DeleteTopicsResponse.json | 2 +- .../common/message/DescribeConfigsResponse.json| 2 +- .../common/message/DescribeGroupsRequest.json | 2 +- .../common/message/DescribeGroupsResponse.json | 2 +- .../common/message/DescribeLogDirsRequest.json | 2 +- .../common/message/DescribeLogDirsResponse.json| 2 +- .../message/ElectPreferredLeadersRequest.json | 4 +- .../message/ElectPreferredLeadersResponse.json | 2 +- .../resources/common/message/EndTxnRequest.json| 4 +- .../resources/common/message/FetchRequest.json | 4 +- .../resources/common/message/FetchResponse.json| 4 +- .../common/message/FindCoordinatorResponse.json| 2 +- .../resources/common/message/HeartbeatRequest.json | 2 +- .../common/message/InitProducerIdRequest.json | 2 +- .../common/message/InitProducerIdResponse.json | 2 +- .../resources/common/message/JoinGroupRequest.json | 2 +- .../common/message/LeaderAndIsrRequest.json| 12 +-- .../common/message/LeaderAndIsrResponse.json | 2 +- .../common/message/LeaveGroupRequest.json | 2 +- .../common/message/ListGroupsResponse.json | 2 +- .../common/message/ListOffsetRequest.json | 4 +- .../common/message/ListOffsetResponse.json | 2 +- .../resources/common/message/MetadataRequest.json | 2 +- .../resources/common/message/MetadataResponse.json | 10 +-- .../common/message/OffsetCommitRequest.json| 4 +- .../common/message/OffsetCommitResponse.json | 2 +- .../common/message/OffsetFetchRequest.json | 5 +- .../common/message/OffsetFetchResponse.json| 2 +- .../message/OffsetForLeaderEpochRequest.json | 2 +- .../message/OffsetForLeaderEpochResponse.json | 2 +- .../resources/common/message/ProduceRequest.json | 4 +- .../resources/common/message/ProduceResponse.json | 2 +- .../common/message/StopReplicaRequest.json | 6 +- .../common/message/StopReplicaResponse.json| 2 +- .../resources/common/message/SyncGroupRequest.json | 2 +- .../common/message/TxnOffsetCommitRequest.json | 6 +- .../common/message/TxnOffsetCommitResponse.json| 2 +- .../common/message/UpdateMetadataRequest.json | 23 +++--- .../common/message/WriteTxnMarkersRequest.json | 4 +- .../common/message/WriteTxnMarkersResponse.json| 4 +- .../java/org/apache/kafka/message/EntityType.java | 62 +++ .../java/org/apache/kafka/message/FieldSpec.java | 6 ++ .../java/org/apache/kafka/message/FieldType.java | 14 ++-- .../org/apache/kafka/message/EntityTypeTest.java | 89 ++ 60 files changed, 261 insertions(+), 102 deletions(-) diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json index 981650f..604a960 100644 --- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json +++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json @@ -20,13 +20,13 @@ // Version 1 is the same as version 0. "validVersions": "0-1", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "0+", +{ "name": "TransactionalId", "type": "string", "versions": "0+", &
[kafka] branch trunk updated: KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a97e55b KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala a97e55b is described below commit a97e55b83868ff786e740db55e73116f85456dcb Author: Bob Barrett AuthorDate: Thu May 9 11:08:22 2019 -0700 KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala Because of how conversions between Java collections and Scala collections work, ImplicitLinkedHashMultiSet objects were being treated as unordered in some contexts where they shouldn't be. This broke JOIN_GROUP handling. This patch renames ImplicitLinkedHashMultiSet to ImplicitLinkedHashMultCollection. The order of Collection objects will be preserved when converting to scala. Adding Set and List "views" to the Collection gives us a more elegant way of accessing that functionality when needed. Reviewers: Colin P. McCabe --- .../kafka/clients/admin/KafkaAdminClient.java | 8 +- .../consumer/internals/AbstractCoordinator.java| 2 +- .../consumer/internals/ConsumerCoordinator.java| 4 +- .../requests/ControlledShutdownResponse.java | 3 +- ...hSet.java => ImplicitLinkedHashCollection.java} | 218 - ...java => ImplicitLinkedHashMultiCollection.java} | 12 +- .../internals/AbstractCoordinatorTest.java | 4 +- .../apache/kafka/common/message/MessageTest.java | 7 +- .../kafka/common/requests/RequestResponseTest.java | 9 +- .../utils/ImplicitLinkedHashCollectionTest.java| 536 + ... => ImplicitLinkedHashMultiCollectionTest.java} | 16 +- .../common/utils/ImplicitLinkedHashSetTest.java| 245 -- .../runtime/distributed/WorkerCoordinator.java | 4 +- .../runtime/distributed/WorkerCoordinatorTest.java | 2 +- .../src/main/scala/kafka/server/FetchSession.scala | 10 +- core/src/main/scala/kafka/server/KafkaApis.scala | 14 +- .../kafka/api/AuthorizerIntegrationTest.scala | 11 +- .../server/AbstractCreateTopicsRequestTest.scala | 6 +- .../scala/unit/kafka/server/KafkaApisTest.scala| 37 ++ .../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +- .../apache/kafka/message/MessageDataGenerator.java | 34 +- .../org/apache/kafka/message/MessageGenerator.java | 4 +- 22 files changed, 853 insertions(+), 341 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index a0958e9..8870519 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -61,7 +61,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.CreateTopicsRequestData; -import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; import org.apache.kafka.common.message.DeleteTopicsRequestData; import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; @@ -71,7 +71,7 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; -import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; import org.apache.kafka.common.metrics.JmxReporter; @@ -1280,7 +1280,7 @@ public class KafkaAdminClient extends AdminClient { public CreateTopicsResult createTopics(final Collection newTopics, final CreateTopicsOptions options) { final Map> topicFutures = new HashMap<>(newTopics.size()); -final CreatableTopicSet topics = new CreatableTopicSet(); +final CreatableTopicCollection topics = new CreatableTopicCollection(); for (NewTopic newTopic : newTopics) { if (topicNameIsUnrepresentable(newTopic.name())) {
[kafka] branch trunk updated (050fdd6 -> 0494cd3)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 050fdd6 KAFKA-8336; Enable dynamic reconfiguration of broker's client-side certs (#6721) add 0494cd3 MINOR: Refactor SslFactory (#6674) No new revisions were added by this update. Summary of changes: .../apache/kafka/common/config/SslClientAuth.java | 36 +- .../org/apache/kafka/common/config/SslConfigs.java | 11 + .../kafka/common/network/SslTransportLayer.java| 3 +- .../common/security/ssl/SslEngineBuilder.java | 316 + .../kafka/common/security/ssl/SslFactory.java | 488 +++-- .../apache/kafka/common/network/EchoServer.java| 2 +- .../kafka/common/security/ssl/SslFactoryTest.java | 175 .../src/main/scala/kafka/admin/ConfigCommand.scala | 1 + core/src/main/scala/kafka/server/KafkaConfig.scala | 14 +- .../scala/kafka/tools/StateChangeLogMerger.scala | 2 +- 10 files changed, 598 insertions(+), 450 deletions(-) copy tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java => clients/src/main/java/org/apache/kafka/common/config/SslClientAuth.java (55%) create mode 100644 clients/src/main/java/org/apache/kafka/common/security/ssl/SslEngineBuilder.java
[kafka] branch 2.3 created (now 614ea55)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git. at 614ea55 KAFKA-8381; Disable hostname validation when verifying inter-broker SSL (#6757) No new revisions were added by this update.
[kafka] branch trunk updated: MINOR: Bump version to 2.4.0-SNAPSHOT (#6774)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 87ff83a MINOR: Bump version to 2.4.0-SNAPSHOT (#6774) 87ff83a is described below commit 87ff83a82efd64849b855b5397fb26ae17327010 Author: Colin Patrick McCabe AuthorDate: Mon May 20 12:47:21 2019 -0700 MINOR: Bump version to 2.4.0-SNAPSHOT (#6774) Reviewers: Jason Gustafson --- docs/js/templateData.js | 6 +++--- gradle.properties | 2 +- kafka-merge-pr.py | 2 +- streams/quickstart/java/pom.xml | 2 +- .../quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py | 2 +- tests/kafkatest/version.py | 2 +- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/js/templateData.js b/docs/js/templateData.js index e5d5f0b..071fa94 100644 --- a/docs/js/templateData.js +++ b/docs/js/templateData.js @@ -17,8 +17,8 @@ limitations under the License. // Define variables for doc templates var context={ -"version": "23", -"dotVersion": "2.3", -"fullDotVersion": "2.3.0", +"version": "24", +"dotVersion": "2.4", +"fullDotVersion": "2.4.0", "scalaVersion": "2.12" }; diff --git a/gradle.properties b/gradle.properties index a7b31d9..c4b58b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,7 +20,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py -version=2.3.0-SNAPSHOT +version=2.4.0-SNAPSHOT scalaVersion=2.12.8 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py index 1723fe3..939fe9c 100755 --- a/kafka-merge-pr.py +++ b/kafka-merge-pr.py @@ -70,7 +70,7 @@ TEMP_BRANCH_PREFIX = "PR_TOOL" DEV_BRANCH_NAME = "trunk" -DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.3.0") +DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "2.4.0") def get_json(url): try: diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index bd0a8e1..13425c0 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.3.0-SNAPSHOT +2.4.0-SNAPSHOT .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 0f18688..300f72f 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.3.0-SNAPSHOT +2.4.0-SNAPSHOT 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index edad6ba..79deb93 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.3.0-SNAPSHOT +2.4.0-SNAPSHOT Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 2e86a3d..7083c2f 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.3.0.dev0' +__version__ = '2.4.0.dev0' diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index c4a958e..87fedea 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -60,7 +60,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("2.3.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("2.4.0-SNAPSHOT") # 0.8.2.x versions V_0_8_2_1 = KafkaVersion("0.8.2.1")
[kafka] branch trunk updated: KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers (#6797)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c6d083d KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers (#6797) c6d083d is described below commit c6d083d7fc0a3c1e08157ffd0e6665be40a5680f Author: Konstantine Karantasis AuthorDate: Thu May 23 11:46:10 2019 -0700 KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers (#6797) Reviewers: Colin P. McCabe --- tests/docker/ducker-ak | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index 04581fb..2a4e5f6 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -240,7 +240,7 @@ docker_run() { # and mount FUSE filesystems inside the container. We also need it to # run iptables inside the container. must_do -v docker run --privileged \ --d -t --net=host -h "${node}" --network ducknet \ +-d -t -h "${node}" --network ducknet \ --memory=${docker_run_memory_limit} --memory-swappiness=1 \ -v "${kafka_dir}:/opt/kafka-dev" --name "${node}" -- "${image_name}" }
[kafka] branch 2.3 updated: KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers (#6797)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new deddd50 KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers (#6797) deddd50 is described below commit deddd50a45c37eb7ab179323435f11d2b67088ef Author: Konstantine Karantasis AuthorDate: Thu May 23 11:46:10 2019 -0700 KAFKA-8417: Remove redundant network definition --net=host when starting testing docker containers (#6797) Reviewers: Colin P. McCabe --- tests/docker/ducker-ak | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/docker/ducker-ak b/tests/docker/ducker-ak index 04581fb..2a4e5f6 100755 --- a/tests/docker/ducker-ak +++ b/tests/docker/ducker-ak @@ -240,7 +240,7 @@ docker_run() { # and mount FUSE filesystems inside the container. We also need it to # run iptables inside the container. must_do -v docker run --privileged \ --d -t --net=host -h "${node}" --network ducknet \ +-d -t -h "${node}" --network ducknet \ --memory=${docker_run_memory_limit} --memory-swappiness=1 \ -v "${kafka_dir}:/opt/kafka-dev" --name "${node}" -- "${image_name}" }
[kafka] branch trunk updated: KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 46a02f3 KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723) 46a02f3 is described below commit 46a02f3231cd6d340c622636159b9f59b4b3cb6e Author: soondenana <50422828+soonden...@users.noreply.github.com> AuthorDate: Fri May 24 17:20:22 2019 -0700 KAFKA-8341. Retry Consumer group operation for NOT_COORDINATOR error (#6723) An API call for consumer groups must send a FindCoordinatorRequest to find the consumer group coordinator, and then send a follow-up request to that node. But the coordinator might move after the FindCoordinatorRequest but before the follow-up request is sent. In that case we currently fail. This change fixes that by detecting this error and then retrying. This fixes listConsumerGroupOffsets, deleteConsumerGroups, and describeConsumerGroups. Reviewers: Colin P. McCabe , Boyang Chen --- .../kafka/clients/admin/KafkaAdminClient.java | 478 - .../kafka/clients/admin/KafkaAdminClientTest.java | 124 +- 2 files changed, 383 insertions(+), 219 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index f0e6635..e612593 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -166,6 +166,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.kafka.common.requests.MetadataRequest.convertToMetadataRequestTopic; @@ -2510,21 +2511,93 @@ public class KafkaAdminClient extends AdminClient { return new DescribeDelegationTokenResult(tokensFuture); } +/** + * Context class to encapsulate parameters of a call to find and use a consumer group coordinator. + * Some of the parameters are provided at construction and are immutable whereas others are provided + * as "Call" are completed and values are available, like node id of the coordinator. + * + * @param The type of return value of the KafkaFuture + * @param The type of configuration option. Different for different consumer group commands. + */ +private final static class ConsumerGroupOperationContext> { +final private String groupId; +final private O options; +final private long deadline; +final private KafkaFutureImpl future; +private Optional node; + +public ConsumerGroupOperationContext(String groupId, + O options, + long deadline, + KafkaFutureImpl future) { +this.groupId = groupId; +this.options = options; +this.deadline = deadline; +this.future = future; +this.node = Optional.empty(); +} + +public String getGroupId() { +return groupId; +} + +public O getOptions() { +return options; +} + +public long getDeadline() { +return deadline; +} + +public KafkaFutureImpl getFuture() { +return future; +} + +public Optional getNode() { +return node; +} + +public void setNode(Node node) { +this.node = Optional.ofNullable(node); +} + +public boolean hasCoordinatorMoved(AbstractResponse response) { +return response.errorCounts().keySet() +.stream() +.anyMatch(error -> error == Errors.NOT_COORDINATOR); +} +} + +private void rescheduleTask(ConsumerGroupOperationContext context, Supplier nextCall) { +log.info("Node {} is no longer the Coordinator. Retrying with new coordinator.", +context.getNode().orElse(null)); +// Requeue the task so that we can try with new coordinator +context.setNode(null); +Call findCoordinatorCall = getFindCoordinatorCall(context, nextCall); +runnable.call(findCoordinatorCall, time.milliseconds()); +} + +private static Map> createFutures(Collection groupIds) { +return new HashSet<>(groupIds).stream().collect( +Collectors.toMap(groupId -> groupId, +groupId -> { +if (groupIdIsUnrepresentable(groupId)) { +
[kafka] branch trunk updated: MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 24f664a MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812) 24f664a is described below commit 24f664aa1621dc70794fd6576ac99547e41d2113 Author: Colin Patrick McCabe AuthorDate: Tue May 28 11:22:09 2019 -0700 MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812) Authorized operations must be null when talking to a pre-KIP-430 broker. If we present this as the empty set instead, it is impossible for clients to know if they have no permissions, or are talking to an old broker. Reviewers: Manikumar Reddy --- .../clients/admin/ConsumerGroupDescription.java| 2 +- .../clients/admin/DescribeClusterOptions.java | 4 + .../kafka/clients/admin/DescribeClusterResult.java | 3 +- .../kafka/clients/admin/KafkaAdminClient.java | 5 +- .../kafka/clients/admin/TopicDescription.java | 4 +- .../org/apache/kafka/common/acl/AclOperation.java | 3 + .../common/requests/DescribeGroupsResponse.java| 19 + .../kafka/common/requests/MetadataResponse.java| 5 +- .../common/message/DescribeGroupsResponse.json | 2 +- .../resources/common/message/MetadataResponse.json | 4 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 96 +- 11 files changed, 137 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 52f23ed..4590c74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -128,7 +128,7 @@ public class ConsumerGroupDescription { } /** - * authorizedOperations for this group + * authorizedOperations for this group, or null if that information is not known. */ public Set authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index abde154..7fb7bd1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions> authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e612593..9b37f5a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1580,7 +1580,7 @@ public class KafkaAdminClient extends AdminClient { controllerFuture.complete(controller(response)); clusterIdFuture.complete(response.clusterId()); authorizedOperationsFuture.complete( - validAclOperations(response.data().clusterAuthorizedOperations())); + validAclOperations(response.data().clusterAuthorizedOperations())); } private Node controller(MetadataResponse response) { @@ -2741,6 +2741,9 @@ public class KafkaAdminClient extends AdminClient { } private Set validAclOperations(final int authorizedOperations) { +if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) { +return null; +} return Utils.from32BitField(authorizedOperations) .stream() .map(AclOperation::fromCode) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index c6d44e8..ea9bf05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -70,7 +70,7 @@ public class TopicDescription { * @param internal Whether the topic is internal to Kafka * @param partitions A list of partitions where the index represents the partition id and the element contains * leadership and replica information for that partition. - * @param authorizedOperations authorized operations for this topic + * @param authorizedOperations authorized operations for this topic, or null if this is not known. */ TopicDescription(String name, bool
[kafka] branch 2.3 updated: MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 321af2d MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812) 321af2d is described below commit 321af2d41b0fc9a568c6f7b72e3708386dae6603 Author: Colin Patrick McCabe AuthorDate: Tue May 28 11:22:09 2019 -0700 MINOR: Auth operations must be null when talking to a pre-KIP-430 broker (#6812) Authorized operations must be null when talking to a pre-KIP-430 broker. If we present this as the empty set instead, it is impossible for clients to know if they have no permissions, or are talking to an old broker. Reviewers: Manikumar Reddy --- .../clients/admin/ConsumerGroupDescription.java| 2 +- .../clients/admin/DescribeClusterOptions.java | 4 + .../kafka/clients/admin/DescribeClusterResult.java | 3 +- .../kafka/clients/admin/KafkaAdminClient.java | 5 +- .../kafka/clients/admin/TopicDescription.java | 4 +- .../org/apache/kafka/common/acl/AclOperation.java | 3 + .../common/requests/DescribeGroupsResponse.java| 19 + .../kafka/common/requests/MetadataResponse.java| 5 +- .../common/message/DescribeGroupsResponse.json | 2 +- .../resources/common/message/MetadataResponse.json | 4 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 96 +- 11 files changed, 137 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 52f23ed..4590c74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -128,7 +128,7 @@ public class ConsumerGroupDescription { } /** - * authorizedOperations for this group + * authorizedOperations for this group, or null if that information is not known. */ public Set authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java index abde154..7fb7bd1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterOptions.java @@ -45,6 +45,10 @@ public class DescribeClusterOptions extends AbstractOptions> authorizedOperations() { return authorizedOperations; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index f0e6635..e394458 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1579,7 +1579,7 @@ public class KafkaAdminClient extends AdminClient { controllerFuture.complete(controller(response)); clusterIdFuture.complete(response.clusterId()); authorizedOperationsFuture.complete( - validAclOperations(response.data().clusterAuthorizedOperations())); + validAclOperations(response.data().clusterAuthorizedOperations())); } private Node controller(MetadataResponse response) { @@ -2631,6 +2631,9 @@ public class KafkaAdminClient extends AdminClient { } private Set validAclOperations(final int authorizedOperations) { +if (authorizedOperations == MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED) { +return null; +} return Utils.from32BitField(authorizedOperations) .stream() .map(AclOperation::fromCode) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index c6d44e8..ea9bf05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -70,7 +70,7 @@ public class TopicDescription { * @param internal Whether the topic is internal to Kafka * @param partitions A list of partitions where the index represents the partition id and the element contains * leadership and replica information for that partition. - * @param authorizedOperations authorized operations for this topic + * @param authorizedOperations authorized operations for this topic, or null if this is not known. */ TopicDescription(String name, bool
[kafka] branch 2.3 updated: KAFKA-8475: Temporarily restore SslFactory.sslContext() helper
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new 5cb3bec KAFKA-8475: Temporarily restore SslFactory.sslContext() helper 5cb3bec is described below commit 5cb3bec626f7db8dd7e1db7f3eae8a13298ddff5 Author: Randall Hauch AuthorDate: Mon Jun 3 12:34:55 2019 -0500 KAFKA-8475: Temporarily restore SslFactory.sslContext() helper Temporarily restore the SslFactory.sslContext() function, which some connectors use. This function is not a public API and it will be removed eventually. For now, we will mark it as deprecated. --- .../main/java/org/apache/kafka/common/security/ssl/SslFactory.java | 6 ++ 1 file changed, 6 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 882b63d..4d94230 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLException; @@ -168,6 +169,11 @@ public class SslFactory implements Reconfigurable { return sslEngineBuilder.createSslEngine(mode, peerHost, peerPort, endpointIdentification); } +@Deprecated +public SSLContext sslContext() { +return sslEngineBuilder.sslContext(); +} + public SslEngineBuilder sslEngineBuilder() { return sslEngineBuilder; }
[kafka] branch trunk updated (3c7c988 -> ce008e7)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 3c7c988 KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative rebalancing (#6850) add ce008e7 KAFKA-8475: Temporarily restore SslFactory.sslContext() helper No new revisions were added by this update. Summary of changes: .../main/java/org/apache/kafka/common/security/ssl/SslFactory.java | 6 ++ 1 file changed, 6 insertions(+)
[kafka] branch trunk updated: MINOR: Update docs to say 2.3 (#6881)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b6d9e15 MINOR: Update docs to say 2.3 (#6881) b6d9e15 is described below commit b6d9e152234b14031ff1851b659b7bdf209d4710 Author: Colin Patrick McCabe AuthorDate: Tue Jun 4 15:38:18 2019 -0700 MINOR: Update docs to say 2.3 (#6881) Jason Gustafson --- docs/documentation.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/documentation.html b/docs/documentation.html index 83c029b..8d790c1 100644 --- a/docs/documentation.html +++ b/docs/documentation.html @@ -26,8 +26,8 @@ Documentation -Kafka 2.2 Documentation -Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, Kafka 2.3 Documentation +Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X,
[kafka] branch 2.3 updated: MINOR: Update docs to say 2.3 (#6881)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/2.3 by this push: new f2cff5f MINOR: Update docs to say 2.3 (#6881) f2cff5f is described below commit f2cff5f15bf4c9eb6e9c328b5f6b75aa4144b8fe Author: Colin Patrick McCabe AuthorDate: Tue Jun 4 15:38:18 2019 -0700 MINOR: Update docs to say 2.3 (#6881) Jason Gustafson --- docs/documentation.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/documentation.html b/docs/documentation.html index 83c029b..8d790c1 100644 --- a/docs/documentation.html +++ b/docs/documentation.html @@ -26,8 +26,8 @@ Documentation -Kafka 2.2 Documentation -Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, Kafka 2.3 Documentation +Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X,
[kafka] 01/01: Bump version to 2.3.0
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to annotated tag 2.3.0-rc1 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 47066f6a9f314d9b885ebd481bb800d993895533 Author: Colin P. Mccabe AuthorDate: Tue Jun 4 16:04:20 2019 -0700 Bump version to 2.3.0 --- gradle.properties | 2 +- streams/quickstart/java/pom.xml| 2 +- .../java/src/main/resources/archetype-resources/pom.xml| 2 +- streams/quickstart/pom.xml | 2 +- tests/kafkatest/__init__.py| 10 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/gradle.properties b/gradle.properties index a7b31d9..8d62beb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -20,7 +20,7 @@ group=org.apache.kafka # - tests/kafkatest/__init__.py # - tests/kafkatest/version.py (variable DEV_VERSION) # - kafka-merge-pr.py -version=2.3.0-SNAPSHOT +version=2.3.0 scalaVersion=2.12.8 task=build org.gradle.jvmargs=-Xmx1024m -Xss2m diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index bd0a8e1..e8626fa 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart -2.3.0-SNAPSHOT +2.3.0 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index 0f18688..7813a0d 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 -2.3.0-SNAPSHOT +2.3.0 1.7.7 1.2.17 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index edad6ba..bb5132f 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom -2.3.0-SNAPSHOT +2.3.0 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 2e86a3d..e964859 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -15,11 +15,11 @@ # This determines the version of kafkatest that can be published to PyPi and installed with pip # -# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT" +# Note that in # due to python version naming restrictions, which are enforced by python packaging tools -# (see https://www.python.org/dev/peps/pep-0440/) +# (see https://www.python.org # -# Instead, in development branches, the version should have a suffix of the form ".devN" +# Instead, in # -# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '2.3.0.dev0' +# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0 +__version__ = '2.3.0'
[kafka] annotated tag 2.3.0-rc1 created (now ffeaa02)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to annotated tag 2.3.0-rc1 in repository https://gitbox.apache.org/repos/asf/kafka.git. at ffeaa02 (tag) tagging 47066f6a9f314d9b885ebd481bb800d993895533 (commit) replaces 0.8.0-beta1 by Colin P. Mccabe on Tue Jun 4 16:04:20 2019 -0700 - Log - 2.3.0-rc1 --- This annotated tag includes the following new commits: new 47066f6 Bump version to 2.3.0 The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
[kafka] branch trunk updated: MINOR: Improve Trogdor external command worker docs (#6438)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 58aa04f MINOR: Improve Trogdor external command worker docs (#6438) 58aa04f is described below commit 58aa04f91e9cd8203cd58f972d042d068727d256 Author: Stanislav Kozlovski AuthorDate: Thu Jun 6 20:04:05 2019 +0300 MINOR: Improve Trogdor external command worker docs (#6438) Reviewers: Colin McCabe , Xi Yang --- TROGDOR.md | 23 +- tests/bin/external_trogdor_command_example.py | 13 +++- .../trogdor/workload/ExternalCommandWorker.java| 2 +- .../workload/ExternalCommandWorkerTest.java| 10 ++ 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index 49c6942..ad8d8af 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -16,7 +16,7 @@ Running Kafka: > ./bin/kafka-server-start.sh ./config/server.properties &> /tmp/kafka.log & -Then, we want to run a Trogdor Agent, plus a Trogdor broker. +Then, we want to run a Trogdor Agent, plus a Trogdor Coordinator. To run the Trogdor Agent: @@ -125,6 +125,27 @@ ProcessStopFault stops a process by sending it a SIGSTOP signal. When the fault ### NetworkPartitionFault NetworkPartitionFault sets up an artificial network partition between one or more sets of nodes. Currently, this is implemented using iptables. The iptables rules are set up on the outbound traffic from the affected nodes. Therefore, the affected nodes should still be reachable from outside the cluster. +External Processes + +Trogdor supports running arbitrary commands in external processes. This is a generic way to run any configurable command in the Trogdor framework - be it a Python program, bash script, docker image, etc. + +### ExternalCommandWorker +ExternalCommandWorker starts an external command defined by the ExternalCommandSpec. It essentially allows you to run any command on any Trogdor agent node. +The worker communicates with the external process via its stdin, stdout and stderr in a JSON protocol. It uses stdout for any actionable communication and only logs what it sees in stderr. +On startup the worker will first send a message describing the workload to the external process in this format: +``` +{"id":, "workload":} +``` +and will then listen for messages from the external process, again in a JSON format. +Said JSON can contain the following fields: +- status: If the object contains this field, the status of the worker will be set to the given value. +- error: If the object contains this field, the error of the worker will be set to the given value. Once an error occurs, the external process will be terminated. +- log: If the object contains this field, a log message will be issued with this text. +An example: +```json +{"log": "Finished successfully.", "status": {"p99ProduceLatency": "100ms", "messagesSent": 1}} +``` + Exec Mode Sometimes, you just want to run a test quickly on a single node. In this case, you can use "exec mode." This mode allows you to run a single Trogdor Agent without a Coordinator. diff --git a/tests/bin/external_trogdor_command_example.py b/tests/bin/external_trogdor_command_example.py index 0e53557..1254b82 100755 --- a/tests/bin/external_trogdor_command_example.py +++ b/tests/bin/external_trogdor_command_example.py @@ -20,7 +20,7 @@ import time # # This is an example of an external script which can be run through Trogdor's -# ExternalCommandWorker. +# ExternalCommandWorker. It sleeps for the given amount of time expressed by the delayMs field in the ExternalCommandSpec # if __name__ == '__main__': @@ -28,11 +28,14 @@ if __name__ == '__main__': line = sys.stdin.readline() start_message = json.loads(line) workload = start_message["workload"] -print("Starting external_trogdor_command_example with task id %s, workload %s" \ -% (start_message["id"], workload)) +print("Starting external_trogdor_command_example with task id %s, workload %s" + % (start_message["id"], workload)) sys.stdout.flush() - `print(json.dumps({"status": "running"}))` + +# pretend to start some workload +print(json.dumps({"status": "running"})) sys.stdout.flush() time.sleep(0.001 * workload["delayMs"]) - `print(json.dumps({"status": "exiting after %s delayMs" % workload["delayMs"]}))` + +print(json.dumps({"status": "exiting after %s delayMs
[kafka] branch trunk updated (73ec730 -> bf6dffe)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 73ec730 KAFKA-9748: Extend Streams integration tests for EOS beta (#8441) add bf6dffe KAFKA-9309: Add the ability to translate Message classes to and from JSON (#7844) No new revisions were added by this update. Summary of changes: checkstyle/import-control.xml | 5 + .../org/apache/kafka/common/protocol/Message.java | 48 ++- .../apache/kafka/common/protocol/MessageUtil.java | 103 ++ .../apache/kafka/common/message/MessageTest.java | 11 + .../java/org/apache/kafka/message/FieldSpec.java | 5 + .../java/org/apache/kafka/message/FieldType.java | 12 + .../apache/kafka/message/IsNullConditional.java| 47 ++- .../apache/kafka/message/MessageDataGenerator.java | 398 +++-- .../org/apache/kafka/message/MessageGenerator.java | 24 ++ .../main/java/org/apache/kafka/message/Target.java | 93 + .../kafka/message/IsNullConditionalTest.java | 8 +- 11 files changed, 699 insertions(+), 55 deletions(-) create mode 100644 generator/src/main/java/org/apache/kafka/message/Target.java
[kafka] branch trunk updated (20e4a74 -> a276c54)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 20e4a74 KAFKA-9832: Extend Streams system tests for EOS-beta (#8443) add a276c54 MINOR: Allow a single struct to be a field in the protocol spec (#8413) No new revisions were added by this update. Summary of changes: .../apache/kafka/common/message/MessageTest.java | 17 .../common/message/SimpleExampleMessageTest.java | 47 +- .../common/message/SimpleExampleMessage.json | 17 +++- .../java/org/apache/kafka/message/FieldSpec.java | 5 ++- .../apache/kafka/message/MessageDataGenerator.java | 33 --- .../org/apache/kafka/message/StructRegistry.java | 35 +++- .../apache/kafka/message/StructRegistryTest.java | 26 7 files changed, 159 insertions(+), 21 deletions(-)
[kafka] branch trunk updated: KAFKA-9863: replace the deprecated --zookeeper options in the documentation (#8482)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a91b067 KAFKA-9863: replace the deprecated --zookeeper options in the documentation (#8482) a91b067 is described below commit a91b06708bcba09ac77ad5d5f57e938bb35a0fc7 Author: showuon <43372967+show...@users.noreply.github.com> AuthorDate: Thu Apr 23 04:42:35 2020 +0800 KAFKA-9863: replace the deprecated --zookeeper options in the documentation (#8482) Reviewers: Ron Dagostino , Colin P. McCabe --- docs/configuration.html | 8 docs/ops.html | 48 docs/security.html | 8 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/docs/configuration.html b/docs/configuration.html index dc17333..9e913a2 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -100,7 +100,7 @@ on broker 0: - > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type brokers --entity-name 0 --alter --add-config + > bin/kafka-configs.sh --zookeeper localhost:2182 --zk-tls-config-file zk_tls_config.properties --entity-type brokers --entity-name 0 --alter --add-config 'listener.name.internal.ssl.key.password=key-password,password.encoder.secret=secret,password.encoder.iterations=8192' @@ -240,18 +240,18 @@ Overrides can also be changed or set later using the alter configs command. This example updates the max message size for my-topic: - > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic + > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --add-config max.message.bytes=128000 To check overrides set on the topic you can do - > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe + > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe To remove an override you can do - > bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic + > bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes diff --git a/docs/ops.html b/docs/ops.html index a098673..bfa42a9 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -94,7 +94,7 @@ You can also set this to false, but you will then need to manually restore leadership to the restored replicas by running the command: - > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot + > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port Balancing Replicas Across Racks @@ -308,7 +308,7 @@ Once the json file is ready, use the partition reassignment tool to generate a candidate assignment: - > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate + > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, @@ -334,7 +334,7 @@ The tool generates a candidate assignment that will move all partitions from topics foo1,foo2 to brokers 5,6. Note, however, that at this point, the partition movement has not started, it merely tells you the current assignment and the proposed new assignment. The current assignment should be saved in case you want to rollback to it. The new assignment should be saved in a json file (e.g. expand-cluster-reassignment.json) to be input to the tool with the --execute option as follows: - > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute + > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, @@ -360,7 +360,7 @@ Finally, the --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option: - > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify + > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignmen
[kafka] branch trunk updated (5c548e5 -> a91b067)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 5c548e5 KAFKA-6145: KIP-441: Build state constrained assignment from balanced one (#8497) add a91b067 KAFKA-9863: replace the deprecated --zookeeper options in the documentation (#8482) No new revisions were added by this update. Summary of changes: docs/configuration.html | 8 docs/ops.html | 48 docs/security.html | 8 3 files changed, 32 insertions(+), 32 deletions(-)
[kafka] branch trunk updated (85e81d4 -> 1d55127)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 85e81d4 KAFKA-9844; Fix race condition which allows more than maximum number of members(#8454) add 1d55127 MINOR: equals() should compare all fields for generated classes (#8539) No new revisions were added by this update. Summary of changes: .../common/utils/ImplicitLinkedHashCollection.java | 20 -- .../utils/ImplicitLinkedHashMultiCollection.java | 16 ++--- .../utils/ImplicitLinkedHashCollectionTest.java| 82 ++ .../apache/kafka/message/MessageDataGenerator.java | 14 ++-- 4 files changed, 87 insertions(+), 45 deletions(-)
[kafka] branch trunk updated (85e81d4 -> 1d55127)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git. from 85e81d4 KAFKA-9844; Fix race condition which allows more than maximum number of members(#8454) add 1d55127 MINOR: equals() should compare all fields for generated classes (#8539) No new revisions were added by this update. Summary of changes: .../common/utils/ImplicitLinkedHashCollection.java | 20 -- .../utils/ImplicitLinkedHashMultiCollection.java | 16 ++--- .../utils/ImplicitLinkedHashCollectionTest.java| 82 ++ .../apache/kafka/message/MessageDataGenerator.java | 14 ++-- 4 files changed, 87 insertions(+), 45 deletions(-)
[kafka] branch trunk updated: MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7cb1600 MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527) 7cb1600 is described below commit 7cb1600d6ae7245bfa717fa93502d28f17096ea8 Author: THREE LEVEL HELMET <31675100+d8tlt...@users.noreply.github.com> AuthorDate: Mon May 4 14:34:23 2020 -0700 MINOR: Clean up some test dependencies on ConfigCommand and TopicCommand (#8527) Avoid calling into ConfigCommand and TopicCommand from tests that are not related to these commands. It's better to just invoke the admin APIs. Change a few cases where we were testing the deprecated --zookeeper flag to testing the --bootstrap-server flag instead. Unless we're explicitly testing the deprecated code path, we should be using the non-deprecated flags. Move testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient from TopicCommandWithAdminClientTest.scala into TopicCommandWithZKClientTest.scala, since it makes more sense in the latter. Reviewers: Colin P. McCabe --- .../scala/integration/kafka/api/SaslSetup.scala| 33 +++--- .../server/DynamicBrokerReconfigurationTest.scala | 18 .../scala/unit/kafka/admin/DeleteTopicTest.scala | 16 +++ .../admin/ReassignPartitionsCommandArgsTest.scala | 6 ++-- .../admin/TopicCommandWithAdminClientTest.scala| 7 - .../kafka/admin/TopicCommandWithZKClientTest.scala | 24 +++- 6 files changed, 73 insertions(+), 31 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 7b06eb4..542f7e1 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -22,17 +22,18 @@ import java.util.Properties import javax.security.auth.login.Configuration import scala.collection.Seq - -import kafka.admin.ConfigCommand import kafka.security.minikdc.MiniKdc -import kafka.server.KafkaConfig +import kafka.server.{ConfigType, KafkaConfig} import kafka.utils.JaasTestUtils.{JaasSection, Krb5LoginModule, ZkDigestModule} import kafka.utils.{JaasTestUtils, TestUtils} +import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.authenticator.LoginManager -import org.apache.kafka.common.security.scram.internals.ScramMechanism +import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} +import org.apache.kafka.common.utils.Time +import org.apache.zookeeper.client.ZKClientConfig /* * Implements an enumeration for the modes enabled here: @@ -148,12 +149,24 @@ trait SaslSetup { } def createScramCredentials(zkConnect: String, userName: String, password: String): Unit = { -val credentials = ScramMechanism.values.map(m => s"${m.mechanismName}=[iterations=4096,password=$password]") -val args = Array("--zookeeper", zkConnect, - "--alter", "--add-config", credentials.mkString(","), - "--entity-type", "users", - "--entity-name", userName) -ConfigCommand.main(args) +val zkClientConfig = new ZKClientConfig() +val zkClient = KafkaZkClient( + zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 3, 3, + Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig)) +val adminZkClient = new AdminZkClient(zkClient) + +val entityType = ConfigType.User +val entityName = userName +val configs = adminZkClient.fetchEntityConfig(entityType, entityName) + +ScramMechanism.values().foreach(mechanism => { + val credential = new ScramFormatter(mechanism).generateCredential(password, 4096) + val credentialString = ScramCredentialUtils.credentialToString(credential) + configs.setProperty(mechanism.mechanismName, credentialString) +}) + +adminZkClient.changeConfigs(entityType, entityName, configs) +zkClient.close() } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index b610db2..7a809be 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -1412,13 +1412,21 @@ class DynamicBrokerReconfigurationTest e
[kafka] 01/02: KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 177abc77ee29721164b8633a62d7b259eca4c7b3 Author: Rajini Sivaram AuthorDate: Fri Mar 13 18:24:03 2020 + KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294) Reviewers: Manikumar Reddy (cherry picked from commit f165cdc32533541db381c4bdfd30da089b3b) --- core/src/main/scala/kafka/log/LogConfig.scala | 4 + .../main/scala/kafka/network/RequestChannel.scala | 54 +- .../src/main/scala/kafka/server/AdminManager.scala | 21 +-- .../scala/kafka/server/DynamicBrokerConfig.scala | 6 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 14 ++ .../unit/kafka/network/RequestChannelTest.scala| 195 + 6 files changed, 277 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 4f26716..59fe81c 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -299,6 +299,10 @@ object LogConfig { def serverConfigName(configName: String): Option[String] = configDef.serverConfigName(configName) + def configType(configName: String): Option[ConfigDef.Type] = { +Option(configDef.configKeys.get(configName)).map(_.`type`) + } + /** * Create a log config instance using the given properties and defaults */ diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 531cac1..c9a7d03 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -23,9 +23,15 @@ import java.util.concurrent._ import com.typesafe.scalalogging.Logger import com.yammer.metrics.core.Meter +import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig import kafka.utils.{Logging, NotNothing, Pool} +import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.memory.MemoryPool +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData._ import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ @@ -98,7 +104,7 @@ object RequestChannel extends Logging { releaseBuffer() } -def requestDesc(details: Boolean): String = s"$header -- ${body[AbstractRequest].toString(details)}" +def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" def body[T <: AbstractRequest](implicit classTag: ClassTag[T], nn: NotNothing[T]): T = { bodyAndSize.request match { @@ -108,6 +114,52 @@ object RequestChannel extends Logging { } } +def loggableRequest: AbstractRequest = { + + def loggableValue(resourceType: ConfigResource.Type, name: String, value: String): String = { +val maybeSensitive = resourceType match { + case ConfigResource.Type.BROKER => KafkaConfig.maybeSensitive(KafkaConfig.configType(name)) + case ConfigResource.Type.TOPIC => KafkaConfig.maybeSensitive(LogConfig.configType(name)) + case ConfigResource.Type.BROKER_LOGGER => false + case _ => true +} +if (maybeSensitive) Password.HIDDEN else value + } + + bodyAndSize.request match { +case alterConfigs: AlterConfigsRequest => + val loggableConfigs = alterConfigs.configs().asScala.map { case (resource, config) => +val loggableEntries = new AlterConfigsRequest.Config(config.entries.asScala.map { entry => +new AlterConfigsRequest.ConfigEntry(entry.name, loggableValue(resource.`type`, entry.name, entry.value)) +}.asJavaCollection) +(resource, loggableEntries) + }.asJava + new AlterConfigsRequest.Builder(loggableConfigs, alterConfigs.validateOnly).build(alterConfigs.version()) + +case alterConfigs: IncrementalAlterConfigsRequest => + val resources = new AlterConfigsResourceCollection(alterConfigs.data.resources.size) + alterConfigs.data().resources().asScala.foreach { resource => +val newResource = new AlterConfigsResource() + .setResourceName(resource.resourceName) + .setResourceType(resource.resourceType) +resource.configs.asScala.foreach { config => + newResource.configs.add(new AlterableConfig() +.setName(config.name) + .setValue(loggableValue(ConfigResource.Type.forId(resource.resourceType), config.name, config.value)) +
[kafka] 02/02: KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git commit e5b5cbea70f5e02767c6df84f4cc7fbfb9288db2 Author: Sanjana Kaundinya AuthorDate: Tue Mar 17 23:02:33 2020 -0700 KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260) * Broker throttles were incorrectly marked as sensitive configurations. Fix this, so that their values can be returned via DescribeConfigs as expected. * Previously, changes to broker configs that consisted only of deletions were ignored by the brokers because of faulty delta calculation logic that didn't consider deletions as changes, only alterations as changes. Fix this and add a regression test. Reviewers: Colin P. McCabe (cherry picked from commit 5fc3cd61fcb73da8b52f34b72fe6bb7457f46ce2) --- .../scala/kafka/server/DynamicBrokerConfig.scala | 33 ++--- .../main/scala/kafka/server/DynamicConfig.scala| 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 22 +- .../kafka/api/PlaintextAdminIntegrationTest.scala | 79 +- 4 files changed, 121 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 92aa048..2f47c21 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -468,10 +468,19 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging reconfigurable.reconfigure(newConfig) } - private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_, _]): mutable.Map[String, _] = { -newProps.asScala.filter { + /** + * Returns the change in configurations between the new props and current props by returning a + * map of the changed configs, as well as the set of deleted keys + */ + private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[String, _]): + (mutable.Map[String, _], Set[String]) = { +val changeMap = newProps.asScala.filter { case (k, v) => v != currentProps.get(k) } +val deletedKeySet = currentProps.asScala.filter { + case (k, _) => !newProps.containsKey(k) +}.keySet +(changeMap, deletedKeySet) } /** @@ -510,8 +519,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = { val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None) -val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals) -if (updatedMap.nonEmpty) { +val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals) +if (changeMap.nonEmpty || deletedKeySet.nonEmpty) { try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove) @@ -519,14 +528,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging case listenerReconfigurable: ListenerReconfigurable => processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) case reconfigurable => -if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet)) - processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) +if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet)) + processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) } // BrokerReconfigurable updates are processed after config is updated. Only do the validation here. val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]() brokerReconfigurables.foreach { reconfigurable => - if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) { + if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) { reconfigurable.validateReconfiguration(newConfig) if (!validateOnly) brokerReconfigurablesToUpdate += reconfigurable @@ -544,8 +553,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging (currentConfig, List.empty) } - private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: S
[kafka] branch 2.5 updated (89317c6 -> e5b5cbe)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a change to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git. from 89317c6 KAFKA-9633: Ensure ConfigProviders are closed (#8204) new 177abc7 KAFKA-9718; Don't log passwords for AlterConfigs in request logs (#8294) new e5b5cbe KAFKA-9625: Fix altering and describing dynamic broker configurations (#8260) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: core/src/main/scala/kafka/log/LogConfig.scala | 4 + .../main/scala/kafka/network/RequestChannel.scala | 54 +- .../src/main/scala/kafka/server/AdminManager.scala | 21 +-- .../scala/kafka/server/DynamicBrokerConfig.scala | 39 +++-- .../main/scala/kafka/server/DynamicConfig.scala| 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 32 .../kafka/api/PlaintextAdminIntegrationTest.scala | 79 - .../unit/kafka/network/RequestChannelTest.scala| 195 + 8 files changed, 396 insertions(+), 30 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/network/RequestChannelTest.scala