[GitHub] [kafka] mjsax merged pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads
mjsax merged pull request #9614: URL: https://github.com/apache/kafka/pull/9614 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9667: MINOR: Do not print log4j for memberId required
chia7712 commented on a change in pull request #9667: URL: https://github.com/apache/kafka/pull/9667#discussion_r535900161 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -465,7 +465,11 @@ boolean joinGroupIfNeeded(final Timer timer) { } } else { final RuntimeException exception = future.exception(); -log.info("Rebalance failed.", exception); + +if (!(exception instanceof MemberIdRequiredException)) { Review comment: For another, does it need some comment to explain why ```MemberIdRequiredException``` is excluded. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -465,7 +465,11 @@ boolean joinGroupIfNeeded(final Timer timer) { } } else { final RuntimeException exception = future.exception(); -log.info("Rebalance failed.", exception); + +if (!(exception instanceof MemberIdRequiredException)) { Review comment: Is ```JoinGroupResponseHandler``` a better place to log error? For example, the error ```UNKNOWN_MEMBER_ID``` is log twice. (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L605) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-10017: --- Assignee: Matthias J. Sax (was: A. Sophie Blee-Goldman) > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: A. Sophie Blee-Goldman >Assignee: Matthias J. Sax >Priority: Blocker > Labels: flaky-test, unit-test > Fix For: 2.8.0 > > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243797#comment-17243797 ] Matthias J. Sax commented on KAFKA-2967: I don't know the details either. But I think it would be a huge improvement over HTML! > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #7409: MINOR: Skip conversion to `Struct` when serializing generated requests/responses
ijuma commented on a change in pull request #7409: URL: https://github.com/apache/kafka/pull/7409#discussion_r535876326 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ## @@ -84,148 +95,151 @@ protected void updateErrorCounts(Map errorCounts, Errors error) errorCounts.put(error, count + 1); } -protected abstract Struct toStruct(short version); +protected abstract Message data(); /** * Parse a response from the provided buffer. The buffer is expected to hold both * the {@link ResponseHeader} as well as the response payload. */ -public static AbstractResponse parseResponse(ByteBuffer byteBuffer, RequestHeader requestHeader) { +public static AbstractResponse parseResponse(ByteBuffer buffer, RequestHeader requestHeader) { ApiKeys apiKey = requestHeader.apiKey(); short apiVersion = requestHeader.apiVersion(); -ResponseHeader responseHeader = ResponseHeader.parse(byteBuffer, apiKey.responseHeaderVersion(apiVersion)); +ResponseHeader responseHeader = ResponseHeader.parse(buffer, apiKey.responseHeaderVersion(apiVersion)); +AbstractResponse response = AbstractResponse.parseResponse(apiKey, buffer, apiVersion); + +// We correlate after parsing the response to avoid spurious correlation errors when receiving malformed +// responses Review comment: TODO: Handle this better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on pull request #9687: MINOR: Move lock method outside try block
g1geordie commented on pull request #9687: URL: https://github.com/apache/kafka/pull/9687#issuecomment-738601977 @chia7712 can you help me to take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie opened a new pull request #9687: MINOR: Move lock method outside try block
g1geordie opened a new pull request #9687: URL: https://github.com/apache/kafka/pull/9687 The `Lock.lock` in the try block may cause `Lock.unlock` throw exception when it throw exception . I think it's nice to move outside although` ReentrantLock.lock` impl doesn't throw exception. ``` class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } } ``` pattern recommended by https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/locks/ReentrantLock.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r535856935 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.MirrorClient; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; + +/** + * Tests MM2 replication and failover/failback logic. + * + * MM2 is configured with active/active replication between two Kafka clusters. Tests validate that + * records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated from + * one cluster to the other and back. Tests validate that consumer offsets are translated and replicated + * between clusters during this failover and failback. + */ +@Category(IntegrationTest.class) +public abstract class MirrorConnectorsIntegrationBaseTest { +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class); + +private static final int NUM_RECORDS_PER_PARTITION = 10; +private static final int NUM_PARTITIONS = 10; +private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +private static final int RECORD_TRANSFER_DURATION_MS = 30_000; +private static final int CHECKPOINT_DURATION_MS = 20_000; +private static final int RECORD_CONSUME_DURATION_MS = 20_000; +private static final int OFFSET_SYNC_DURATION_MS = 30_000; +private static final int NUM_WORKERS = 3; +private static final int CONSUMER_POLL_TIMEOUT_MS = 500; +private static final int BROKER_RESTART_TIMEOUT_MS = 10_000; +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final String PRIMARY_CLUSTER_ALIAS = "primary"; +private static final String BACKUP_CLUSTER_ALIAS = "backup"; +private static final List>
[jira] [Commented] (KAFKA-10672) Restarting Kafka always takes a lot of time
[ https://issues.apache.org/jira/browse/KAFKA-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243730#comment-17243730 ] Wenbing Shen commented on KAFKA-10672: -- * We increased the batch number of one IO read * After many tests, the startup speed increased by 50% on average * See the attached file for detailed code > Restarting Kafka always takes a lot of time > --- > > Key: KAFKA-10672 > URL: https://issues.apache.org/jira/browse/KAFKA-10672 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.0.0 > Environment: A cluster of 21 Kafka nodes; > Each node has 12 disks; > Each node has about 1500 partitions; > There are approximately 700 leader partitions per node; > Slow-loading partitions have about 1000 log segments; >Reporter: Wenbing Shen >Priority: Major > Attachments: AbstractIterator.java, AbstractIteratorOfRestart.java, > AbstractLegacyRecordBatch.java, ByteBufferLogInputStream.java, > DefaultRecordBatch.java, FileLogInputStream.java, FileRecords.java, > LazyDownConversionRecords.java, Log.scala, LogInputStream.java, > LogManager.scala, LogSegment.scala, MemoryRecords.java, > RecordBatchIterator.java, RecordBatchIteratorOfRestart.java, Records.java, > server.log > > > When the snapshot file does not exist, or the latest snapshot file before the > current active period, restoring the state of producers will traverse the log > section, it will traverse the log all batch, in the period when the > individual broker node partition number many, that there are most of the > number of logs, can cause a lot of IO number, IO will only load one batch at > a time, such as a log there will always be in the tens of thousands of batch, > I found that in the code for each batch are at least two IO operation, when a > batch as the default 16 KB,When a log segment is 1G, 65,536 batches will be > generated, and then at least 65,536 *2= 131,072 IO operations will be > generated, which will lead to a lot of time spent in kafka startup process. > We configured 15 log recovery threads in the production environment, and it > still took more than 2 hours to load a partition,can community puts forward > some proposals to the situation or improve.For detailed logs, see the section > on test-perf-18 partitions in the nearby logs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
guozhangwang commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-738579702 Hey @tombentley Sorry for the late reply! I also checked the source code of `ScheduledThreadPoolExecutor` and I think what you've inferred is correct: although it did not keep a single thread alive all the time but dynamically creates and stops the thread in case there's no tasks scheduled for a long time (note we set num.thread == 1), the blocking queue should still guarantee a FIFO ordering. So what's possibly happening is that, the thread handling an leaderISR request with the old controller epoch grabs the lock and proceeds first. In that case, maybe we can simplify your proposed solution as the following: * note that, like @hachikuji mentioned, `onLeadershipChange` is actually protected by the `replicaStateChangeLock` inside the `becomeLeaderOrFollower`, and hence we only need to pass in the controller epoch in `groupCoordinator.onElection/onResignation` just like what we did in `txnCoordinator`. * Inside the `groupCoordinator.onElection/onResignation`, which is lock-protected, we just remember the latest controller epoch at the `GroupCoordinator`, and then we just to one check right before `scheduleLoadGroupAndOffsets/removeGroupsForPartition` against that controller epoch, if not passed, we can log and skip the loading / unloading function call. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10672) Restarting Kafka always takes a lot of time
[ https://issues.apache.org/jira/browse/KAFKA-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen updated KAFKA-10672: - Attachment: Records.java RecordBatchIteratorOfRestart.java RecordBatchIterator.java MemoryRecords.java LogSegment.scala LogManager.scala LogInputStream.java Log.scala LazyDownConversionRecords.java FileRecords.java FileLogInputStream.java DefaultRecordBatch.java ByteBufferLogInputStream.java AbstractLegacyRecordBatch.java AbstractIteratorOfRestart.java AbstractIterator.java > Restarting Kafka always takes a lot of time > --- > > Key: KAFKA-10672 > URL: https://issues.apache.org/jira/browse/KAFKA-10672 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.0.0 > Environment: A cluster of 21 Kafka nodes; > Each node has 12 disks; > Each node has about 1500 partitions; > There are approximately 700 leader partitions per node; > Slow-loading partitions have about 1000 log segments; >Reporter: Wenbing Shen >Priority: Major > Attachments: AbstractIterator.java, AbstractIteratorOfRestart.java, > AbstractLegacyRecordBatch.java, ByteBufferLogInputStream.java, > DefaultRecordBatch.java, FileLogInputStream.java, FileRecords.java, > LazyDownConversionRecords.java, Log.scala, LogInputStream.java, > LogManager.scala, LogSegment.scala, MemoryRecords.java, > RecordBatchIterator.java, RecordBatchIteratorOfRestart.java, Records.java, > server.log > > > When the snapshot file does not exist, or the latest snapshot file before the > current active period, restoring the state of producers will traverse the log > section, it will traverse the log all batch, in the period when the > individual broker node partition number many, that there are most of the > number of logs, can cause a lot of IO number, IO will only load one batch at > a time, such as a log there will always be in the tens of thousands of batch, > I found that in the code for each batch are at least two IO operation, when a > batch as the default 16 KB,When a log segment is 1G, 65,536 batches will be > generated, and then at least 65,536 *2= 131,072 IO operations will be > generated, which will lead to a lot of time spent in kafka startup process. > We configured 15 log recovery threads in the production environment, and it > still took more than 2 hours to load a partition,can community puts forward > some proposals to the situation or improve.For detailed logs, see the section > on test-perf-18 partitions in the nearby logs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r535846145 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.MirrorClient; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; + +/** + * Tests MM2 replication and failover/failback logic. + * + * MM2 is configured with active/active replication between two Kafka clusters. Tests validate that + * records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated from + * one cluster to the other and back. Tests validate that consumer offsets are translated and replicated + * between clusters during this failover and failback. + */ +@Category(IntegrationTest.class) +public abstract class MirrorConnectorsIntegrationBaseTest { +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class); + +private static final int NUM_RECORDS_PER_PARTITION = 10; +private static final int NUM_PARTITIONS = 10; +private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +private static final int RECORD_TRANSFER_DURATION_MS = 30_000; +private static final int CHECKPOINT_DURATION_MS = 20_000; +private static final int RECORD_CONSUME_DURATION_MS = 20_000; +private static final int OFFSET_SYNC_DURATION_MS = 30_000; +private static final int NUM_WORKERS = 3; +private static final int CONSUMER_POLL_TIMEOUT_MS = 500; +private static final int BROKER_RESTART_TIMEOUT_MS = 10_000; +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final String PRIMARY_CLUSTER_ALIAS = "primary"; +private static final String BACKUP_CLUSTER_ALIAS = "backup"; +private static final List>
[GitHub] [kafka] apovzner commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas
apovzner commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r535830690 ## File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ## @@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { EasyMock.reset(alterResult, describeResult) } + @Test + def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = { +// when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota +// for ip entities +val node = new Node(1, "localhost", 9092) +val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + +def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", +"--entity-type", entityType, "--entity-name", "admin", +"--alter") ++ alterOpts) + val e = intercept[IllegalArgumentException] { +ConfigCommand.alterConfig(mockAdminClient, opts) + } + assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config")) +} + +verifyCommand("ips", "--add-config", "connection_creation_rate=1,some_config=10") Review comment: super tiny nit: `verifyCommand` name makes the code a bit confusing since the command is expected to fail. What about `verifyCommandFails` or `verifyFails`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas
apovzner commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r535829042 ## File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala ## @@ -259,13 +264,19 @@ class DynamicConnectionQuotaTest extends BaseRequestTest { } private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): Unit = { -adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default), - CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, updatedRate.toString)) +val initialConnectionCount = connectionCount +val adminClient = createAdminClient() Review comment: `TestUtils.alterClientQuotas` called before `adminClient.close` can throw an exception. Admin extends AutoCloseable, so you can use try-with-resources. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config
chia7712 merged pull request #9682: URL: https://github.com/apache/kafka/pull/9682 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10803) Skip improper dynamic configs while initialization and include the rest correct ones
[ https://issues.apache.org/jira/browse/KAFKA-10803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-10803: -- Assignee: Prateek Agarwal > Skip improper dynamic configs while initialization and include the rest > correct ones > > > Key: KAFKA-10803 > URL: https://issues.apache.org/jira/browse/KAFKA-10803 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.6.0, 2.5.1, 2.7.0, 2.8.0 >Reporter: Prateek Agarwal >Assignee: Prateek Agarwal >Priority: Major > > There is [a > bug|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470] > in how incorrect dynamic config keys are removed from the original > Properties list, resulting in persisting the improper configs in the > properties list. > This eventually results in exception being thrown while parsing the list by > [KafkaConfig > ctor|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531], > resulting in skipping of the complete dynamic list (including the correct > ones). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config
chia7712 commented on pull request #9682: URL: https://github.com/apache/kafka/pull/9682#issuecomment-738549721 ``` Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` known flaky. Will merge this to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9679: MINOR: Make Histogram.clear more readable
chia7712 merged pull request #9679: URL: https://github.com/apache/kafka/pull/9679 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9686: KAFKA-10804 add more subsets and exclude performance tests
chia7712 opened a new pull request #9686: URL: https://github.com/apache/kafka/pull/9686 This PR is a part of https://issues.apache.org/jira/browse/KAFKA-10804 The following tasks are included by this PR. 1. add more subsets 1. not to run all system tests - exclude performance ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10804) Tune travis system tests to avoid timeouts
[ https://issues.apache.org/jira/browse/KAFKA-10804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-10804: -- Assignee: Chia-Ping Tsai > Tune travis system tests to avoid timeouts > -- > > Key: KAFKA-10804 > URL: https://issues.apache.org/jira/browse/KAFKA-10804 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Chia-Ping Tsai >Priority: Major > > Thanks to https://github.com/apache/kafka/pull/9652, we are now running > system tests for PRs. However, it looks like we need some tuning because many > of the subsets are timing out. For example: > https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be > a matter of adding more subsets or changing the timeout, but we should > probably also consider whether we want to run all system tests or if there is > a more useful subset of them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr commented on a change in pull request #8404: KAFKA-10787: Introduce an import order in Java sources
dongjinleekr commented on a change in pull request #8404: URL: https://github.com/apache/kafka/pull/8404#discussion_r535791013 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -16,21 +16,21 @@ */ package org.apache.kafka.clients; -import java.util.HashSet; -import java.util.Set; - -import java.util.stream.Collectors; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.utils.ExponentialBackoff; import org.apache.kafka.common.utils.LogContext; + import org.slf4j.Logger; Review comment: @chia7712 Oh, the description I had at first on 2nd April is outdated; [After the discussion](https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E), we concluded that [the following three-group ordering](https://issues.apache.org/jira/browse/KAFKA-10787) would be better: - `kafka`, `org.apache.kafka` - `com`, `net`, `org` - `java`, `javax` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10636) Bypass log validation for writes to raft log
[ https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243646#comment-17243646 ] feyman commented on KAFKA-10636: [~hachikuji] Cool, thanks for the help! > Bypass log validation for writes to raft log > > > Key: KAFKA-10636 > URL: https://issues.apache.org/jira/browse/KAFKA-10636 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: feyman >Priority: Major > Labels: raft-log-management > > The raft leader is responsible for creating the records written to the log > (including assigning offsets and the epoch), so we can consider bypassing the > validation done in `LogValidator`. This lets us skip potentially expensive > decompression and the unnecessary recomputation of the CRC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users
showuon commented on pull request #9627: URL: https://github.com/apache/kafka/pull/9627#issuecomment-738505590 @abbccdda @guozhangwang , could you please help review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10807) AlterConfig should be validated by the target broker
Boyang Chen created KAFKA-10807: --- Summary: AlterConfig should be validated by the target broker Key: KAFKA-10807 URL: https://issues.apache.org/jira/browse/KAFKA-10807 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen After forwarding is enabled, AlterConfigs will no longer be sent to the target broker. This behavior bypasses some important config change validations, such as path existence, static config conflict, or even worse when the target broker is offline, the propagated result does not reflect a true applied result. We should gather those necessary cases, and decide whether to actually handle the AlterConfig request firstly on the target broker, and then forward, in a validate-forward-apply path. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10345) Add file-watch based update for trust/key store paths
[ https://issues.apache.org/jira/browse/KAFKA-10345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10345: Description: With forwarding enabled, per-broker alter-config doesn't go to the target broker anymore, we need to have a mechanism to propagate in-place file update through file watch. (was: With forwarding enabled, per-broker alter-config doesn't go to the target broker anymore, we need to have a mechanism to propagate that update through ZK without affecting other config changes.) > Add file-watch based update for trust/key store paths > - > > Key: KAFKA-10345 > URL: https://issues.apache.org/jira/browse/KAFKA-10345 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.8.0 > > > With forwarding enabled, per-broker alter-config doesn't go to the target > broker anymore, we need to have a mechanism to propagate in-place file update > through file watch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10345) Add file-watch based update for trust/key store paths
[ https://issues.apache.org/jira/browse/KAFKA-10345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10345: Summary: Add file-watch based update for trust/key store paths (was: Add ZK-notification based update for trust/key store paths) > Add file-watch based update for trust/key store paths > - > > Key: KAFKA-10345 > URL: https://issues.apache.org/jira/browse/KAFKA-10345 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.8.0 > > > With forwarding enabled, per-broker alter-config doesn't go to the target > broker anymore, we need to have a mechanism to propagate that update through > ZK without affecting other config changes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] splett2 opened a new pull request #9685: KAFKA-10748: Add IP connection rate throttling metric
splett2 opened a new pull request #9685: URL: https://github.com/apache/kafka/pull/9685 This PR adds the IP throttling metric as described in KIP-612. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r535743912 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel, .setTopicConfigErrorCode(Errors.NONE.code) } } +val topicIds = zkClient.getTopicIdsForTopics(results.asScala.map(result => result.name()).toSet) Review comment: If going to ZK here is too slow, another option is to provide a callback to adminManager.createTopics which can be called after createTopicWithAssignment. This callback would add the topic ID to the result. The idea is that createTopicWithAssignment (and writeTopicPartitionAssignment) would return the topic ID to avoid an extra call to ZK. I wasn't sure which option was better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r535743912 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel, .setTopicConfigErrorCode(Errors.NONE.code) } } +val topicIds = zkClient.getTopicIdsForTopics(results.asScala.map(result => result.name()).toSet) Review comment: If going to ZK here is too slow, another option is to provide a callback to adminManager.createTopics which can be called after createTopicWithAssignment. The idea is that createTopicWithAssignment (and writeTopicPartitionAssignment) would return the topic ID to avoid an extra call to ZK. I wasn't sure which option was better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan opened a new pull request #9684: URL: https://github.com/apache/kafka/pull/9684 Updated CreateTopicResponse, DeleteTopicsRequest/Response and added some new AdminClient methods and classes. Now the newly created topic ID will be returned in CreateTopicsResult and found in TopicAndMetadataConfig, and topics can be deleted by supplying topic IDs through deleteTopicsWithIds which will return DeleteTopicsWithIdsResult. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9668: MINOR: add test for repartition/source-topic/changelog optimization
ableegoldman commented on a change in pull request #9668: URL: https://github.com/apache/kafka/pull/9668#discussion_r535724656 ## File path: streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java ## @@ -463,7 +463,36 @@ public void shouldReuseSourceTopicAsChangelogsWithOptimization20() { } @Test -public void shouldNotReuseSourceTopicAsChangelogsByDefault() { +public void shouldNotReuseRepartitionTopicAsChangelogs() { +final String topic = "topic"; +builder.stream(topic).repartition().toTable(Materialized.as("store")); +final Properties props = StreamsTestUtils.getStreamsConfig("appId"); +props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); +final Topology topology = builder.build(props); + +final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); +internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)); + +assertThat( +internalTopologyBuilder.buildTopology().storeToChangelogTopic(), +equalTo(Collections.singletonMap("store", "appId-store-changelog")) +); +assertThat( +internalTopologyBuilder.stateStores().keySet(), +equalTo(Collections.singleton("store")) +); +assertThat( + internalTopologyBuilder.stateStores().get("store").loggingEnabled(), +equalTo(true) +); +assertThat( + internalTopologyBuilder.topicGroups().get(1).stateChangelogTopics.keySet(), +equalTo(Collections.singleton("appId-store-changelog")) +); +} + +@Test +public void shouldNotReuseRepartitionTopicAsChangelogsByDefault() { Review comment: This should still be called `shouldNotReuseSourceTopicAsChangelogsByDefault`, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure
guozhangwang commented on a change in pull request #9676: URL: https://github.com/apache/kafka/pull/9676#discussion_r535691107 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1219,6 +1219,9 @@ class Log(@volatile private var _dir: File, appendInfo.logAppendTime = duplicate.timestamp appendInfo.logStartOffset = logStartOffset case None => + if (logDirFailureChannel.logDirIsOffline(parentDir)) { +throw new KafkaStorageException(s"The log dir $parentDir has failed."); Review comment: Maybe we can leave a more informative error message here? Sth like "... dir has failed due to a previous IO exception", just indicating it is not failed because of the current calling trace. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault
guozhangwang merged pull request #9681: URL: https://github.com/apache/kafka/pull/9681 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10636) Bypass log validation for writes to raft log
[ https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10636: -- Labels: raft-log-management (was: ) > Bypass log validation for writes to raft log > > > Key: KAFKA-10636 > URL: https://issues.apache.org/jira/browse/KAFKA-10636 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: feyman >Priority: Major > Labels: raft-log-management > > The raft leader is responsible for creating the records written to the log > (including assigning offsets and the epoch), so we can consider bypassing the > validation done in `LogValidator`. This lets us skip potentially expensive > decompression and the unnecessary recomputation of the CRC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10806) throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions
xiongqi wu created KAFKA-10806: -- Summary: throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions Key: KAFKA-10806 URL: https://issues.apache.org/jira/browse/KAFKA-10806 Project: Kafka Issue Type: Bug Reporter: xiongqi wu When kafka producer tries to complete/abort a batch, producer invokes user callback. However, "completeFutureAndFireCallbacks" only captures exceptions from user callback not all throwables. An uncaught throwable can prevent the batch from being freed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10636) Bypass log validation for writes to raft log
[ https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243525#comment-17243525 ] Jason Gustafson commented on KAFKA-10636: - [~feyman] Sounds good, thanks for picking it up. It might just be a matter of taking advantage of the `AppendOrigin` which we added to `Log.appendAsLeader`. > Bypass log validation for writes to raft log > > > Key: KAFKA-10636 > URL: https://issues.apache.org/jira/browse/KAFKA-10636 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: feyman >Priority: Major > > The raft leader is responsible for creating the records written to the log > (including assigning offsets and the epoch), so we can consider bypassing the > validation done in `LogValidator`. This lets us skip potentially expensive > decompression and the unnecessary recomputation of the CRC. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243518#comment-17243518 ] James Galasyn commented on KAFKA-2967: -- Thank you for the feedback, [~vvcephei]! I only suggested RTD because that's what we do for ksqlDB, but I'm very happy to use the existing infra. I think you're right about leveraging the existing job, but I don't know the deets. Maybe [~guozhang] or [~mjsax] can weigh in. So it seems like it's not a huge lift to make this happen. If everybody thinks it's worthwhile for me to investigate, I'd like to dive in for a couple of days and see what I come up with. > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah merged pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics
mumrah merged pull request #9677: URL: https://github.com/apache/kafka/pull/9677 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics
mumrah commented on pull request #9677: URL: https://github.com/apache/kafka/pull/9677#issuecomment-738303699 Failed tests are known flaky https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9677/8/ ![image](https://user-images.githubusercontent.com/55116/101087722-9c578700-3580-11eb-8f88-604ee09eda8c.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243502#comment-17243502 ] James Galasyn commented on KAFKA-2967: -- [~mjsax] Very cool, thank you for the tip! > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243503#comment-17243503 ] John Roesler commented on KAFKA-2967: - Hi [~JimGalasyn] , Thanks for that information! It's very cool that you were able to convert the site to markdown. Thanks for actually proving that it can be done with a reasonable level of effort. I didn't understand the rationale for using ReadTheDocs. The docs are currently hosted as a static site by the ASF for free. If we have markdown, we should be able to use Jekyll or Hugo or something to render the markdown into HTML as part of the job that currently copies the site over to the static host, right? It seems like the ASF has some suggestions for how we could do this: https://infra.apache.org/project-site.html Thanks again, -John > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243500#comment-17243500 ] Matthias J. Sax commented on KAFKA-2967: Btw: [https://infra.apache.org/project-site.html] > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bbejeck commented on pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState
bbejeck commented on pull request #9674: URL: https://github.com/apache/kafka/pull/9674#issuecomment-738292594 Java 11 failed with ``` > Task :streams:streams-scala:integrationTest FAILED 05:26:47 org.gradle.internal.remote.internal.ConnectException: Could not connect to server [704081bf-4ce4-4a82-ae43-69c7e855ca11 port:38063, addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9512: KAFKA-10394: generate snapshot
ijuma commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r535574184 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotWriter implements RawSnapshotWriter { +private final Path tempSnapshotPath; +private final FileChannel channel; +private final OffsetAndEpoch snapshotId; +private boolean frozen = false; + +private FileRawSnapshotWriter( +Path tempSnapshotPath, +FileChannel channel, +OffsetAndEpoch snapshotId +) { +this.tempSnapshotPath = tempSnapshotPath; +this.channel = channel; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() throws IOException { +return channel.size(); +} + +@Override +public void append(ByteBuffer buffer) throws IOException { +if (frozen) { +throw new IllegalStateException( +String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +); +} + +Utils.writeFully(channel, buffer); +} + +@Override +public boolean isFrozen() { +return frozen; +} + +@Override +public void freeze() throws IOException { +channel.close(); +frozen = true; + +// Set readonly and ignore the result +if (!tempSnapshotPath.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); +} + +Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId); +Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE); Review comment: Also, I would not rely on reading the code to assume one way or another. You'd want to test it too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9512: KAFKA-10394: generate snapshot
ijuma commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r535572033 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotWriter implements RawSnapshotWriter { +private final Path tempSnapshotPath; +private final FileChannel channel; +private final OffsetAndEpoch snapshotId; +private boolean frozen = false; + +private FileRawSnapshotWriter( +Path tempSnapshotPath, +FileChannel channel, +OffsetAndEpoch snapshotId +) { +this.tempSnapshotPath = tempSnapshotPath; +this.channel = channel; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() throws IOException { +return channel.size(); +} + +@Override +public void append(ByteBuffer buffer) throws IOException { +if (frozen) { +throw new IllegalStateException( +String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +); +} + +Utils.writeFully(channel, buffer); +} + +@Override +public boolean isFrozen() { +return frozen; +} + +@Override +public void freeze() throws IOException { +channel.close(); +frozen = true; + +// Set readonly and ignore the result +if (!tempSnapshotPath.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); +} + +Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId); +Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE); Review comment: It's not only Windows, NFS also has some restrictions. Why don't you want to use the utility method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9683: MINOR: Fix KTable-KTable foreign-key join example
mjsax commented on pull request #9683: URL: https://github.com/apache/kafka/pull/9683#issuecomment-738284048 Merged to `trunk` and cherry-picked to `2.7` and `2.6` branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9683: MINOR: Fix KTable-KTable foreign-key join example
mjsax merged pull request #9683: URL: https://github.com/apache/kafka/pull/9683 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn opened a new pull request #9683: DOCS-6115: Fix KTable-KTable foreign-key join example
JimGalasyn opened a new pull request #9683: URL: https://github.com/apache/kafka/pull/9683 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10555) Improve client state machine
[ https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-10555: -- Assignee: Walker Carlson > Improve client state machine > > > Key: KAFKA-10555 > URL: https://issues.apache.org/jira/browse/KAFKA-10555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Walker Carlson >Priority: Major > Labels: needs-kip > > The KafkaStreams client exposes its state to the user for monitoring purpose > (ie, RUNNING, REBALANCING etc). The state of the client depends on the > state(s) of the internal StreamThreads that have their own states. > Furthermore, the client state has impact on what the user can do with the > client. For example, active task can only be queried in RUNNING state and > similar. > With KIP-671 and KIP-663 we improved error handling capabilities and allow to > add/remove stream thread dynamically. We allow adding/removing threads only > in RUNNING and REBALANCING state. This puts us in a "weird" position, because > if we enter ERROR state (ie, if the last thread dies), we cannot add new > threads and longer. However, if we have multiple threads and one dies, we > don't enter ERROR state and do allow to recover the thread. > Before the KIPs the definition of ERROR state was clear, however, with both > KIPs it seem that we should revisit its semantics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10805) More useful reporting from travis system tests
[ https://issues.apache.org/jira/browse/KAFKA-10805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243461#comment-17243461 ] Jason Gustafson commented on KAFKA-10805: - cc [~chia7712] > More useful reporting from travis system tests > -- > > Key: KAFKA-10805 > URL: https://issues.apache.org/jira/browse/KAFKA-10805 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > Inspecting system test output from travis is a bit painful at the moment > because you have to check the build logs to find the tests that failed. > Additionally, there is no logging available from the workers which is often > essential to debug a failure. We should look into how we can improve the > build so that the output is more convenient and useful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10805) More useful reporting from travis system tests
Jason Gustafson created KAFKA-10805: --- Summary: More useful reporting from travis system tests Key: KAFKA-10805 URL: https://issues.apache.org/jira/browse/KAFKA-10805 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Inspecting system test output from travis is a bit painful at the moment because you have to check the build logs to find the tests that failed. Additionally, there is no logging available from the workers which is often essential to debug a failure. We should look into how we can improve the build so that the output is more convenient and useful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10804) Tune travis system tests to avoid timeouts
[ https://issues.apache.org/jira/browse/KAFKA-10804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243458#comment-17243458 ] Jason Gustafson commented on KAFKA-10804: - cc [~chia7712] > Tune travis system tests to avoid timeouts > -- > > Key: KAFKA-10804 > URL: https://issues.apache.org/jira/browse/KAFKA-10804 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > Thanks to https://github.com/apache/kafka/pull/9652, we are now running > system tests for PRs. However, it looks like we need some tuning because many > of the subsets are timing out. For example: > https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be > a matter of adding more subsets or changing the timeout, but we should > probably also consider whether we want to run all system tests or if there is > a more useful subset of them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10804) Tune travis system tests to avoid timeouts
Jason Gustafson created KAFKA-10804: --- Summary: Tune travis system tests to avoid timeouts Key: KAFKA-10804 URL: https://issues.apache.org/jira/browse/KAFKA-10804 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Thanks to https://github.com/apache/kafka/pull/9652, we are now running system tests for PRs. However, it looks like we need some tuning because many of the subsets are timing out. For example: https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be a matter of adding more subsets or changing the timeout, but we should probably also consider whether we want to run all system tests or if there is a more useful subset of them. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #9276: KAFKA-10473: Add docs on partition size-on-disk, and other log-related metrics
guozhangwang merged pull request #9276: URL: https://github.com/apache/kafka/pull/9276 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9276: KAFKA-10473: Add docs on partition size-on-disk, and other log-related metrics
guozhangwang commented on a change in pull request #9276: URL: https://github.com/apache/kafka/pull/9276#discussion_r535511409 ## File path: docs/ops.html ## @@ -1129,6 +1128,26 @@ Security Considerations for Remote Mon kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec + +Size of a partition on disk (in bytes) + kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+) +The size of a partition on disk, measured in bytes. + + +Number of log segments in a partition + kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+) +The number of log segments in a partition. Review comment: I think this is fine :) ## File path: docs/ops.html ## @@ -1129,6 +1128,26 @@ Security Considerations for Remote Mon kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec + +Size of a partition on disk (in bytes) + kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+) +The size of a partition on disk, measured in bytes. + + +Number of log segments in a partition + kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+) +The number of log segments in a partition. + + +First offset in a partition + kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+) +The first offset in a partition. + + +Last offset in a partition + kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+) +The last offset in a partition. Review comment: This is the last uncomitted offset: after the local append it would be incremented, not waiting for other replicas to ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454 ] Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:16 PM: -- I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here |https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] was (Author: jolshan): I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here | https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454 ] Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:15 PM: -- I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here| https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] was (Author: jolshan): I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here| [https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com/]] > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454 ] Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:15 PM: -- I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here| https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] was (Author: jolshan): I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here| https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454 ] Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:15 PM: -- I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here | https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] was (Author: jolshan): I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here| https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient] > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454 ] Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:14 PM: -- I've updated the KIP to add another method like describeTopics that uses topic IDs instead [see here| [https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com/]] was (Author: jolshan): I've updated the KIP to add another method like describeTopics that uses topic IDs instead [https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com] > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10774) Support Describe topic using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454 ] Justine Olshan commented on KAFKA-10774: I've updated the KIP to add another method like describeTopics that uses topic IDs instead [https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com] > Support Describe topic using topic IDs > -- > > Key: KAFKA-10774 > URL: https://issues.apache.org/jira/browse/KAFKA-10774 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > > Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs > to MetadataReq and can get TopicDesc by topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads
mjsax commented on pull request #9614: URL: https://github.com/apache/kafka/pull/9614#issuecomment-738231066 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535503453 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { Review comment: Expanding on this, the problem in the shutdown thread. When the join only waits for alive threads, and to be alive the thread needs to be started. So if in between the check and the start thread another thread transitions the state to NOT_RUNNING the thread will not join in the shutdown thread. Then when it continues it will start as it passed the check and we will have a thread running after the client is shutdown. This would be extremely though race condition to find or reproduce so best to just avoid it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113 ] James Galasyn edited comment on KAFKA-2967 at 12/3/20, 6:53 PM: Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo. There are numerous options for building and hosting, for example, ksqlDB uses a ReadTheDocs project with a Basic account ($50/month). The ksdqlDB docs have used this model successfully since November 2019, so the execution risk is low. I looked around at other ASF projects to see how they handle docs, and there's no prescribed solution. Each has its own approach. * *Pulsar*: [https://github.com/apache/pulsar/tree/master/site2] Markdown and [https://docusaurus.io/] * *Flink*: [https://github.com/apache/flink/tree/master/docs] Markdown and Jekyll * *Ant*: Raw HTML [https://github.com/apache/ant/tree/master/manual] * *Calcite*: Markdown and Jekyll [https://github.com/apache/calcite/tree/master/site/_docs] * *Cassandra*: RestructuredText and Sphinx [https://github.com/apache/cassandra/tree/trunk/doc] * *Hadoop*: [https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-auth/src/site/markdown/BuildingIt.md] Also, we could use whatever hosting solution we use currently for [https://kafka.apache.org/documentation/]. The big win for the community is getting the docs into markdown, which greatly increases the ease of contribution and flexibility of presentation. was (Author: jimgalasyn): Since I inherited the Streams docs, I've come to appreciate the points made by [~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate the HTML-based docs to markdown and host them on ReadTheDocs. We used this approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], and there's no reason it won't work for the AK docs. Last week, I took a couple of hours and manually converted the entire AK doc set to markdown by using Pandoc: [ak-docs-proto repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to do, like fixing headings and updating links, but as a proof-of-concept, it works okay. Once the markdown is cleaned up, we would move it to the docs directory in the public AK GitHub repo and set up a ReadTheDocs project with a Basic account ($50/month). RTD will build from our GH branches and host the site for us. The ksdqlDB docs have used this model successfully since May 2020, so the execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you think? > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > Labels: documentation > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request
lmr3796 commented on a change in pull request #9435: URL: https://github.com/apache/kafka/pull/9435#discussion_r535489887 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1243,19 +1244,30 @@ class KafkaApis(val requestChannel: RequestChannel, topicResponses } else { val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet) - val responsesForNonExistentTopics = nonExistentTopics.map { topic => + val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic => if (isInternal(topic)) { val topicMetadata = createInternalTopic(topic) - if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) -metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) - else -topicMetadata + Some( +if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) + metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) +else + topicMetadata + ) +} else if (isFetchAllMetadata) { + // KAFKA-10606: If this request is to get metadata for all topics, auto topic creation should not be allowed + // The special handling is necessary on broker side because allowAutoTopicCreation is hard coded to true + // for backward compatibility on client side. + // + // However, in previous versions, UNKNOWN_TOPIC_OR_PARTITION won't happen on fetch all metadata, + // so, for backward-compatibility, we need to skip these not founds during fetch all metadata here. Review comment: Thanks @ijuma , To follow the convention in Kafka, I just updated the comments. Let me know if you have there are any other thoughts! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r535482262 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -74,18 +74,19 @@ public void freeze() throws IOException { frozen = true; // Set readonly and ignore the result -if (!path.toFile().setReadOnly()) { -throw new IOException(String.format("Unable to set file %s as read-only", path)); +if (!tempSnapshotPath.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); } -Path destination = Snapshots.moveRename(path, snapshotId); -Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE); +Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId); +Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE); } @Override public void close() throws IOException { channel.close(); -Files.deleteIfExists(path); +// This is a noop if freeze was called before calling close +Files.deleteIfExists(tempSnapshotPath); Review comment: Yes! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10777) Add additional configuration to control MirrorMaker 2 internal topics naming convention
[ https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-10777: -- Summary: Add additional configuration to control MirrorMaker 2 internal topics naming convention (was: Add additional configuration to control MirrorMaker2 internal topics naming convention) > Add additional configuration to control MirrorMaker 2 internal topics naming > convention > --- > > Key: KAFKA-10777 > URL: https://issues.apache.org/jira/browse/KAFKA-10777 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Omnia Ibrahim >Assignee: Omnia Ibrahim >Priority: Minor > Labels: mirror, mirror-maker, mirrormaker > > MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are > hardcoded in the source code which makes it hard to run MM2 with any Kafka > Cluster that has rules around topic’s naming convention and doesn’t allow > auto-creation for topics. > In this case developers will need to create these internal topics up-front > manually and make sure they do follow the cluster rules and guidance for > topic creation, so MM2 should have flexibility to let you override the name > of internal topics to follow their cluster topic naming convention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10777) Add additional configuration to control MirrorMaker2 internal topics naming convention
[ https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-10777: -- Summary: Add additional configuration to control MirrorMaker2 internal topics naming convention (was: Add additional configuration to control MM2 internal topics naming convention) > Add additional configuration to control MirrorMaker2 internal topics naming > convention > -- > > Key: KAFKA-10777 > URL: https://issues.apache.org/jira/browse/KAFKA-10777 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Omnia Ibrahim >Assignee: Omnia Ibrahim >Priority: Minor > Labels: mirror, mirror-maker, mirrormaker > > MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are > hardcoded in the source code which makes it hard to run MM2 with any Kafka > Cluster that has rules around topic’s naming convention and doesn’t allow > auto-creation for topics. > In this case developers will need to create these internal topics up-front > manually and make sure they do follow the cluster rules and guidance for > topic creation, so MM2 should have flexibility to let you override the name > of internal topics to follow their cluster topic naming convention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10777) Add additional configuration to control MM2 internal topics naming convention
[ https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim reassigned KAFKA-10777: - Assignee: Omnia Ibrahim > Add additional configuration to control MM2 internal topics naming convention > - > > Key: KAFKA-10777 > URL: https://issues.apache.org/jira/browse/KAFKA-10777 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Omnia Ibrahim >Assignee: Omnia Ibrahim >Priority: Minor > Labels: mirror, mirror-maker, mirrormaker > > MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are > hardcoded in the source code which makes it hard to run MM2 with any Kafka > Cluster that has rules around topic’s naming convention and doesn’t allow > auto-creation for topics. > In this case developers will need to create these internal topics up-front > manually and make sure they do follow the cluster rules and guidance for > topic creation, so MM2 should have flexibility to let you override the name > of internal topics to follow their cluster topic naming convention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535477229 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +streamThread.shutdown(); +threads.remove(streamThread); + resizeThreadCache(getCacheSizePerThread(threads.size())); +return Optional.empty(); +} +} +} +} +return Optional.empty(); +} -final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); -if (hasGlobalTopology) { -globalStreamThread.setStateListener(streamStateListener); +private int getNextThreadIndex() { +final HashSet names = new HashSet<>(); +for (final StreamThread streamThread: threads) { +names.add(streamThread.getName()); } -for (final StreamThread thread : threads) { -thread.setStateListener(streamStateListener); +final String baseName = clientId + "-StreamThread-"; +for (int i = 0; i < threads.size(); i++) { Review comment: Sure, I missed that suggestion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10777) Add additional configuration to control MM2 internal topics naming convention
[ https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim updated KAFKA-10777: -- Labels: mirror mirror-maker mirrormaker (was: ) > Add additional configuration to control MM2 internal topics naming convention > - > > Key: KAFKA-10777 > URL: https://issues.apache.org/jira/browse/KAFKA-10777 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Omnia Ibrahim >Priority: Minor > Labels: mirror, mirror-maker, mirrormaker > > MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are > hardcoded in the source code which makes it hard to run MM2 with any Kafka > Cluster that has rules around topic’s naming convention and doesn’t allow > auto-creation for topics. > In this case developers will need to create these internal topics up-front > manually and make sure they do follow the cluster rules and guidance for > topic creation, so MM2 should have flexibility to let you override the name > of internal topics to follow their cluster topic naming convention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot
jsancio commented on a change in pull request #9512: URL: https://github.com/apache/kafka/pull/9512#discussion_r535474040 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.snapshot; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.OffsetAndEpoch; + +public final class FileRawSnapshotWriter implements RawSnapshotWriter { +private final Path tempSnapshotPath; +private final FileChannel channel; +private final OffsetAndEpoch snapshotId; +private boolean frozen = false; + +private FileRawSnapshotWriter( +Path tempSnapshotPath, +FileChannel channel, +OffsetAndEpoch snapshotId +) { +this.tempSnapshotPath = tempSnapshotPath; +this.channel = channel; +this.snapshotId = snapshotId; +} + +@Override +public OffsetAndEpoch snapshotId() { +return snapshotId; +} + +@Override +public long sizeInBytes() throws IOException { +return channel.size(); +} + +@Override +public void append(ByteBuffer buffer) throws IOException { +if (frozen) { +throw new IllegalStateException( +String.format("Append not supported. Snapshot is already frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath) +); +} + +Utils.writeFully(channel, buffer); +} + +@Override +public boolean isFrozen() { +return frozen; +} + +@Override +public void freeze() throws IOException { +channel.close(); +frozen = true; + +// Set readonly and ignore the result +if (!tempSnapshotPath.toFile().setReadOnly()) { +throw new IOException(String.format("Unable to set file (%s) as read-only", tempSnapshotPath)); +} + +Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId); +Files.move(tempSnapshotPath, destination, StandardCopyOption.ATOMIC_MOVE); Review comment: I investigated the issue a bit. It look like this utility was introduce to replace these lines in `OffsetCheckpoint.scala`: ```scala // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException(...) } ``` Looking at the JDK implementation in Windows, `ATOMIC_MOVE` should work in Windows if the target file exists: https://github.com/openjdk/jdk/blob/master/src/java.base/windows/classes/sun/nio/fs/WindowsFileCopy.java#L298-L312 In other words, I think we can keep the code as is in this PR. cc @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535471146 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler Review comment: ah good catch. the diff makes that hard to see as it was actually moved to a new method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535468574 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { Review comment: Well newThread only syncs the addThread method. There is still the race condition between the second check of is running and starting the thread. It seems like a bad idea to leave that open as it could cause thread state changes when there shouldn't be. Starting the thread is relatively low cost so this shouldn't have much impact perf wise. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics
hachikuji commented on a change in pull request #9677: URL: https://github.com/apache/kafka/pull/9677#discussion_r535452252 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -1403,7 +1426,9 @@ class Partition(val topicPartition: TopicPartition, } result match { -case Left(error: Errors) => error match { +case Left(error: Errors) => + isrChangeListener.markFailed() + error match { Review comment: nit: now the block below is misaligned This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10739) Replace EpochEndOffset with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-10739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-10739. - Resolution: Done > Replace EpochEndOffset with automated protocol > -- > > Key: KAFKA-10739 > URL: https://issues.apache.org/jira/browse/KAFKA-10739 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > Follow up of KAFKA-9630. We can avoid extra conversation by using the > auto-generated data structure instead of using `EpochEndOffset ` internally. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol
dajac merged pull request #9630: URL: https://github.com/apache/kafka/pull/9630 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol
dajac commented on pull request #9630: URL: https://github.com/apache/kafka/pull/9630#issuecomment-738174240 Test failure is unrelated: * Build / JDK 8 / kafka.api.TransactionsTest.testBumpTransactionalEpoch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config
prat0318 commented on a change in pull request #9682: URL: https://github.com/apache/kafka/pull/9682#discussion_r535442782 ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -467,7 +467,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging case _: Exception => true } } -invalidProps.foreach(props.remove) +invalidProps.foreach { Review comment: done. ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -400,6 +400,33 @@ class DynamicBrokerConfigTest { newprops.put(KafkaConfig.BackgroundThreadsProp, "100") dynamicBrokerConfig.updateBrokerConfig(0, newprops) } + + @Test + def testImproperConfigsAreRemoved(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +val configs = KafkaConfig(props) + +assertEquals(Int.MaxValue, configs.maxConnections) +assertEquals(1048588, configs.messageMaxBytes) + +var newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(, configs.maxConnections) +assertEquals(, configs.messageMaxBytes) + +newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +// Invalid value should be skipped and reassigned as Originals +assertEquals(Int.MaxValue, configs.maxConnections) Review comment: done. ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -400,6 +400,33 @@ class DynamicBrokerConfigTest { newprops.put(KafkaConfig.BackgroundThreadsProp, "100") dynamicBrokerConfig.updateBrokerConfig(0, newprops) } + + @Test + def testImproperConfigsAreRemoved(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +val configs = KafkaConfig(props) + +assertEquals(Int.MaxValue, configs.maxConnections) +assertEquals(1048588, configs.messageMaxBytes) + +var newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(, configs.maxConnections) +assertEquals(, configs.messageMaxBytes) + +newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +// Invalid value should be skipped and reassigned as Originals Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config
chia7712 commented on a change in pull request #9682: URL: https://github.com/apache/kafka/pull/9682#discussion_r535428840 ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -400,6 +400,33 @@ class DynamicBrokerConfigTest { newprops.put(KafkaConfig.BackgroundThreadsProp, "100") dynamicBrokerConfig.updateBrokerConfig(0, newprops) } + + @Test + def testImproperConfigsAreRemoved(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +val configs = KafkaConfig(props) + +assertEquals(Int.MaxValue, configs.maxConnections) +assertEquals(1048588, configs.messageMaxBytes) + +var newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(, configs.maxConnections) +assertEquals(, configs.messageMaxBytes) + +newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +// Invalid value should be skipped and reassigned as Originals +assertEquals(Int.MaxValue, configs.maxConnections) Review comment: Could you replace ```Int.MaxValue``` by ```Defaults.MaxConnections```? ## File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala ## @@ -467,7 +467,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging case _: Exception => true } } -invalidProps.foreach(props.remove) +invalidProps.foreach { Review comment: How about ```invalidProps.keys.foreach(props.remove)```? it is still one line. ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -400,6 +400,33 @@ class DynamicBrokerConfigTest { newprops.put(KafkaConfig.BackgroundThreadsProp, "100") dynamicBrokerConfig.updateBrokerConfig(0, newprops) } + + @Test + def testImproperConfigsAreRemoved(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +val configs = KafkaConfig(props) + +assertEquals(Int.MaxValue, configs.maxConnections) +assertEquals(1048588, configs.messageMaxBytes) + +var newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(, configs.maxConnections) +assertEquals(, configs.messageMaxBytes) + +newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +// Invalid value should be skipped and reassigned as Originals Review comment: "Originals" -> "default value" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9675: KAFKA-10794 Replica leader election is too slow in the case of too many partitions
chia7712 commented on pull request #9675: URL: https://github.com/apache/kafka/pull/9675#issuecomment-738140790 @lqjack good question! > I find the only differences is that controllerContext.allPartitions can be invoked once or the number of partition times . ```controllerContext.allPartitions``` does not return a constant value. It create a new collection and the overhead could be high if there are a lot of partitions. This PR makes ```controllerContext.allPartitions``` be called only once to reduce the cost of getting "all partitions". > does the patch can resolve the issue ? @Montyleo It seems to me the optimization of this PR is good enough. However, it would be better to show the improvement on your env by this patch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option
cadonna commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r535383909 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler Review comment: nit: If it happens that you need to push another commit, could you fix the indentation here? Sorry that I haven't noticed this before. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, cacheSizePerThread, stateDirectory, delegatingStateRestoreListener, -i + 1, +threadIdx, KafkaStreams.this::closeToError, -this::defaultStreamsUncaughtExceptionHandler -); -threads.add(streamThread); -threadState.put(streamThread.getId(), streamThread.state()); -storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); -} +streamsUncaughtExceptionHandler +); +streamThread.setStateListener(streamStateListener); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +return streamThread; +} -ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> -Math.toIntExact(threads.stream().filter(thread -> thread.state().isAlive()).count())); +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (newThread) { +if (isRunningOrRebalancing()) { +final int threadIdx = getNextThreadIndex(); +final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); +resizeThreadCache(cacheSizePerThread); +final StreamThread streamThread = createStreamThread(cacheSizePerThread, threadIdx); +synchronized (stateLock) { +if (isRunningOrRebalancing()) { +streamThread.start(); +return Optional.of(streamThread.getName()); +} else { +streamThread.shutdown(); +threads.remove(streamThread); + resizeThreadCache(getCacheSizePerThread(threads.size())); +return Optional.empty(); +} +} +} +} +return Optional.empty(); +} -final StreamStateListener streamStateListener = new StreamStateListener(threadState, globalThreadState); -if (hasGlobalTopology) { -globalStreamThread.setStateListener(streamStateListener); +private int getNextThreadIndex() { +final HashSet names = new HashSet<>(); +for (final StreamThread streamThread: threads) { +names.add(streamThread.getName()); } -for (final StreamThread thread : threads) { -thread.setStateListener(streamStateListener); +final String baseName = clientId + "-StreamThread-"; +for (int i = 0; i < threads.size(); i++) { Review comment: Shouldn't that be `int i = 1; i <= threads.size(); i++`? Otherwise, we would look up `*-StreamThread-0"` and we would not look up `"*-StreamThread-" + threads.size()`. Could you add some tests that check the correct naming as @mjsax suggested? ##
[GitHub] [kafka] splett2 commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas
splett2 commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r535397962 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -56,11 +56,12 @@ import scala.collection._ * --entity-type users --entity-name --entity-type clients --entity-name * broker: --broker OR --entity-type brokers --entity-name * broker-logger: --broker-logger OR --entity-type broker-loggers --entity-name + * ip: --IP OR --entity-type IPs --entity-name * - * --entity-type --entity-default may be specified in place of --entity-type --entity-name - * when describing or altering default configuration for users, clients, or brokers, respectively. - * Alternatively, --user-defaults, --client-defaults, or --broker-defaults may be specified in place of - * --entity-type --entity-default, respectively. + * --entity-type --entity-default may be specified in place of --entity-type --entity-name + * when describing or altering default configuration for users, clients, brokers, or IPs, respectively. + * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or --IP-defaults may be specified in place of Review comment: @dajac it was originally lower-case in #9386 after reading the KIP, i realized it was described as being upper case, e.g. ``` Default quota for will be stored in Zookeeper at /config/IPs/ Quota for a specific IP address for which quota override is defined will be stored in Zookeeper at /config/IPs/ ``` and changed it to upper in this PR with the change in ``` core/src/main/scala/kafka/server/DynamicConfigManager.scala ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10803) Skip improper dynamic configs while initialization and include the rest correct ones
[ https://issues.apache.org/jira/browse/KAFKA-10803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243320#comment-17243320 ] Prateek Agarwal commented on KAFKA-10803: - PR: https://github.com/apache/kafka/pull/9682 > Skip improper dynamic configs while initialization and include the rest > correct ones > > > Key: KAFKA-10803 > URL: https://issues.apache.org/jira/browse/KAFKA-10803 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.6.0, 2.5.1, 2.7.0, 2.8.0 >Reporter: Prateek Agarwal >Priority: Major > > There is [a > bug|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470] > in how incorrect dynamic config keys are removed from the original > Properties list, resulting in persisting the improper configs in the > properties list. > This eventually results in exception being thrown while parsing the list by > [KafkaConfig > ctor|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531], > resulting in skipping of the complete dynamic list (including the correct > ones). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas
dajac commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r535388027 ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -920,32 +954,49 @@ class AdminManager(val config: KafkaConfig, !name.isDefined || !strict } -def fromProps(props: Map[String, String]): Map[String, Double] = { - props.map { case (key, value) => -val doubleValue = try value.toDouble catch { - case _: NumberFormatException => -throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") -} -key -> doubleValue - } -} - -(userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) => +(userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) => val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) } if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) Some(userClientIdToEntity(u, c) -> fromProps(quotaProps)) else None -}.flatten.toMap +}.toMap Review comment: Ah. I missed the `flatMap`. That makes sense, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas
dajac commented on a change in pull request #9628: URL: https://github.com/apache/kafka/pull/9628#discussion_r535387504 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -56,11 +56,12 @@ import scala.collection._ * --entity-type users --entity-name --entity-type clients --entity-name * broker: --broker OR --entity-type brokers --entity-name * broker-logger: --broker-logger OR --entity-type broker-loggers --entity-name + * ip: --IP OR --entity-type IPs --entity-name * - * --entity-type --entity-default may be specified in place of --entity-type --entity-name - * when describing or altering default configuration for users, clients, or brokers, respectively. - * Alternatively, --user-defaults, --client-defaults, or --broker-defaults may be specified in place of - * --entity-type --entity-default, respectively. + * --entity-type --entity-default may be specified in place of --entity-type --entity-name + * when describing or altering default configuration for users, clients, brokers, or IPs, respectively. + * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or --IP-defaults may be specified in place of Review comment: @splett2 Isn't the ZK path already lower-case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] prat0318 commented on a change in pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config
prat0318 commented on a change in pull request #9682: URL: https://github.com/apache/kafka/pull/9682#discussion_r535387403 ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -400,6 +400,33 @@ class DynamicBrokerConfigTest { newprops.put(KafkaConfig.BackgroundThreadsProp, "100") dynamicBrokerConfig.updateBrokerConfig(0, newprops) } + + @Test + def testImproperConfigsAreRemoved(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) +val configs = KafkaConfig(props) + +assertEquals(Int.MaxValue, configs.maxConnections) +assertEquals(1048588, configs.messageMaxBytes) + +var newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(, configs.maxConnections) +assertEquals(, configs.messageMaxBytes) + +newProps = new Properties() +newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT") +newProps.put(KafkaConfig.MessageMaxBytesProp, "") + +configs.dynamicConfig.updateDefaultConfig(newProps) +// Invalid value should be skipped and reassigned as Originals +assertEquals(Int.MaxValue, configs.maxConnections) +// Even if One property is invalid, the below should get correctly updated. +assertEquals(, configs.messageMaxBytes) Review comment: Current trunk code returns the old value `` for this instead of the newly configured ``. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r535386444 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.MirrorClient; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; +import static org.apache.kafka.test.TestUtils.waitForCondition; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.apache.kafka.connect.mirror.TestUtils.generateRecords; + +/** + * Tests MM2 replication and failover/failback logic. + * + * MM2 is configured with active/active replication between two Kafka clusters. Tests validate that + * records sent to either cluster arrive at the other cluster. Then, a consumer group is migrated from + * one cluster to the other and back. Tests validate that consumer offsets are translated and replicated + * between clusters during this failover and failback. + */ +@Category(IntegrationTest.class) +public abstract class MirrorConnectorsIntegrationBaseTest { +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class); + +private static final int NUM_RECORDS_PER_PARTITION = 10; +private static final int NUM_PARTITIONS = 10; +private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +private static final int RECORD_TRANSFER_DURATION_MS = 30_000; +private static final int CHECKPOINT_DURATION_MS = 20_000; +private static final int RECORD_CONSUME_DURATION_MS = 20_000; +private static final int OFFSET_SYNC_DURATION_MS = 30_000; +private static final int NUM_WORKERS = 3; +private static final int CONSUMER_POLL_TIMEOUT_MS = 500; +private static final int BROKER_RESTART_TIMEOUT_MS = 10_000; +private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); +private static final String PRIMARY_CLUSTER_ALIAS = "primary"; +private static final String BACKUP_CLUSTER_ALIAS = "backup"; +private static final List>
[GitHub] [kafka] prat0318 opened a new pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config
prat0318 opened a new pull request #9682: URL: https://github.com/apache/kafka/pull/9682 There is [a bug](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470) in how incorrect dynamic config keys are removed from the original Properties list, resulting in persisting the improper configs in the properties list. This eventually results in exception being thrown while parsing the list by [KafkaConfig ctor](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531), resulting in skipping of the complete dynamic list (including the correct ones). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal
junrao commented on pull request #7884: URL: https://github.com/apache/kafka/pull/7884#issuecomment-738119273 @wushujames : This PR is mostly ready. It's just waiting for another committer more familiar with the transactional logic to take another look. @ConcurrencyPractitioner : Would you be able to rebase this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10803) Skip improper dynamic configs while initialization and include the rest correct ones
Prateek Agarwal created KAFKA-10803: --- Summary: Skip improper dynamic configs while initialization and include the rest correct ones Key: KAFKA-10803 URL: https://issues.apache.org/jira/browse/KAFKA-10803 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.5.1, 2.6.0, 1.1.1, 2.7.0, 2.8.0 Reporter: Prateek Agarwal There is [a bug|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470] in how incorrect dynamic config keys are removed from the original Properties list, resulting in persisting the improper configs in the properties list. This eventually results in exception being thrown while parsing the list by [KafkaConfig ctor|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531], resulting in skipping of the complete dynamic list (including the correct ones). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-738115781 https://github.com/apache/kafka/pull/9224#discussion_r535179359 I think the `try catch` block is needed for several Exception. ``` /kafka/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java:41: error: unreported exception IOException; must be caught or declared to be thrown sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); ``` ``` /kafka/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java:42: error: unreported exception GeneralSecurityException; must be caught or declared to be thrown sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10802) Spurious log message when starting consumers
[ https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243273#comment-17243273 ] Gary Russell commented on KAFKA-10802: -- Also >I am not sure I understand why it's logged at INFO at all, since it's a normal >state during initial handshaking. Consider reducing it to DEBUG (when the exception is MemberIdRequiredException)? > Spurious log message when starting consumers > > > Key: KAFKA-10802 > URL: https://issues.apache.org/jira/browse/KAFKA-10802 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.6.0 >Reporter: Mickael Maison >Priority: Major > > Reported by Gary Russell in the [2.6.1 RC3 vote > thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E] > I am seeing this on every consumer start: > 2020-11-25 13:54:34.858 INFO 42250 --- [ntainer#0-0-C-1] > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed. > org.apache.kafka.common.errors.MemberIdRequiredException: The group member > needs to have a valid member id before actually entering a consumer group. > Due to this change > https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468 > I understand what a MemberIdRequiredException is, but the previous (2.6.0) > log (with exception.getMessage()) didn't stand out like the new one does > because it was all on one line. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7588) Rationalize configurations passed to pluggable APIs
[ https://issues.apache.org/jira/browse/KAFKA-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243261#comment-17243261 ] Ismael Juma commented on KAFKA-7588: Seems like the PR was merged, so this Jira should be closed. > Rationalize configurations passed to pluggable APIs > --- > > Key: KAFKA-7588 > URL: https://issues.apache.org/jira/browse/KAFKA-7588 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > There are a lot of extensions points both on the client and server sides. > Most of these pluggable APIs are configurable but the configurations they > receive are not the same. > For example, Serializers, Deserializers, Partitioners, Assignors, > QuotaCallbacks are passed config.originals(). On the other hand LoginModules, > PrincipalBuilders and AuthenticationCallbackHandlers are passed > config.values(). > In practice, having access to originals() is nice as it allows to use custom > configurations by simply adding it to the client/server configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request
ijuma commented on a change in pull request #9435: URL: https://github.com/apache/kafka/pull/9435#discussion_r535321971 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1243,19 +1244,30 @@ class KafkaApis(val requestChannel: RequestChannel, topicResponses } else { val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet) - val responsesForNonExistentTopics = nonExistentTopics.map { topic => + val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic => if (isInternal(topic)) { val topicMetadata = createInternalTopic(topic) - if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) -metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) - else -topicMetadata + Some( +if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) + metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) +else + topicMetadata + ) +} else if (isFetchAllMetadata) { + // KAFKA-10606: If this request is to get metadata for all topics, auto topic creation should not be allowed + // The special handling is necessary on broker side because allowAutoTopicCreation is hard coded to true + // for backward compatibility on client side. + // + // However, in previous versions, UNKNOWN_TOPIC_OR_PARTITION won't happen on fetch all metadata, + // so, for backward-compatibility, we need to skip these not founds during fetch all metadata here. Review comment: We don't usually include JIRA references in the code unless it's a very complex issue. I think the comment here could be something like: "A metadata request for all topics should never result in topic auto creation. A topic may be deleted between the creation of `topics` and `topicResponses`, so we make sure to always return `None` for this case." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault
cadonna commented on a change in pull request #9681: URL: https://github.com/apache/kafka/pull/9681#discussion_r535310939 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java ## @@ -136,13 +136,13 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception { final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; -// Assert that only active is able to query for a key by default -assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); Review comment: Last time we forgot to move this line into the `try-catch-clause`. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java ## @@ -136,13 +136,13 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception { final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; -// Assert that only active is able to query for a key by default -assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); try { if (kafkaStreams1IsActive) { +assertThat(store1.get(key), is(notNullValue())); assertThat(store2.get(key), is(nullValue())); } else { assertThat(store1.get(key), is(nullValue())); +assertThat(store2.get(key), is(notNullValue())); } Review comment: I thought this is easier readable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault
cadonna commented on pull request #9681: URL: https://github.com/apache/kafka/pull/9681#issuecomment-738054912 Call for review: @vvcephei @ableegoldman @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault
cadonna opened a new pull request #9681: URL: https://github.com/apache/kafka/pull/9681 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10460) ReplicaListValidator format checking is incomplete
[ https://issues.apache.org/jira/browse/KAFKA-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10460. Fix Version/s: 2.8.0 Resolution: Fixed > ReplicaListValidator format checking is incomplete > -- > > Key: KAFKA-10460 > URL: https://issues.apache.org/jira/browse/KAFKA-10460 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 >Reporter: Robin Palotai >Assignee: Ankit Kumar >Priority: Minor > Fix For: 2.8.0 > > > See > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] > . The logic is supposed to accept only two cases: > * list of k:v pairs > * a single '*' > But in practice, since the disjunction's second part only checks that the > head is '*', the case where a k:v list is headed by a star is also accepted > (and then later broker dies at startup, refusing the value). > This practically happened due to a CruiseControl bug (see > [https://github.com/linkedin/cruise-control/issues/1322]) > Observed on 2.4, but seems to be present in HEAD's source as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison merged pull request #9326: KAFKA-10460: ReplicaListValidator format checking is incomplete
mimaison merged pull request #9326: URL: https://github.com/apache/kafka/pull/9326 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org