Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
chia7712 commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2039097741 > I think the class file size increasing is indeed a direct drawback after adding -parameter option because we'll include all the parameters into .class files. I'd like to know if there's any other way to fix this? Could we use ARGUMENTS instead of ARGUMENTS_WITH_NAMES? Or we can add the new arg to compileTestJava only to avoid impacting production binary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on code in PR #15645: URL: https://github.com/apache/kafka/pull/15645#discussion_r1553060780 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.utils.TestInfoUtils; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class ConfigCommandIntegrationTest extends QuorumTestHarness { +/** @see TestInfoUtils#TestWithParameterizedQuorumName() */ +public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.{argumentsWithNames}"; Review Comment: Hello, @chia7712 https://github.com/apache/kafka/pull/15666 created to add `junit-platform.properties` to tools. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Default test name added to tools [kafka]
nizhikov opened a new pull request, #15666: URL: https://github.com/apache/kafka/pull/15666 This PR adds `junit-platform.properties` to `tools` and removes outdated constant from java code. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Default test name added to core [kafka]
nizhikov opened a new pull request, #15667: URL: https://github.com/apache/kafka/pull/15667 This PR adds junit-platform.properties to core and removes outdated constant from scala code. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on code in PR #15645: URL: https://github.com/apache/kafka/pull/15645#discussion_r1553114426 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.utils.TestInfoUtils; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class ConfigCommandIntegrationTest extends QuorumTestHarness { +/** @see TestInfoUtils#TestWithParameterizedQuorumName() */ +public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.{argumentsWithNames}"; Review Comment: https://github.com/apache/kafka/pull/15667 created to add `junit-platform.properties` to `core`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]
nizhikov commented on code in PR #15645: URL: https://github.com/apache/kafka/pull/15645#discussion_r1553114805 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import kafka.cluster.Broker; +import kafka.cluster.EndPoint; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.utils.TestInfoUtils; +import kafka.zk.AdminZkClient; +import kafka.zk.BrokerInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.security.PasswordEncoder; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ZooKeeperInternals; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class ConfigCommandIntegrationTest extends QuorumTestHarness { +/** @see TestInfoUtils#TestWithParameterizedQuorumName() */ +public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.{argumentsWithNames}"; Review Comment: @chia7712 please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16039: RecordHeaders supports the addAll method [kafka]
vamossagar12 commented on PR #15034: URL: https://github.com/apache/kafka/pull/15034#issuecomment-2039191732 The instructions for signing up are [here](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-GettingStarted). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Test PR without the circular dependency code [kafka]
vamossagar12 closed pull request #15653: Test PR without the circular dependency code URL: https://github.com/apache/kafka/pull/15653 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Test PR without the circular dependency code [kafka]
vamossagar12 commented on PR #15653: URL: https://github.com/apache/kafka/pull/15653#issuecomment-2039212910 The purpose of this PR was to validate a certain behaviour with circular dependency (explained [here](https://github.com/apache/kafka/pull/15642#discussion_r1551420344)) and as confirmed in [this](https://github.com/apache/kafka/pull/15642#discussion_r1552049826) comment we can close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16471) SslTransportLayer may leak SSLEngine resources
[ https://issues.apache.org/jira/browse/KAFKA-16471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834200#comment-17834200 ] Gaurav Narula commented on KAFKA-16471: --- [~chia7712] I'm not well versed with the backporting policy. Given that one needs https://issues.apache.org/jira/browse/KAFKA-16305 as well to use Netty based OpenSSL engine, perhaps we backport KAFKA-16305 to 3.7 and leave out 3.6 for now? > SslTransportLayer may leak SSLEngine resources > -- > > Key: KAFKA-16471 > URL: https://issues.apache.org/jira/browse/KAFKA-16471 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 3.7.0, 3.6.1 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > {{SslTransportLayer}} does not invoke {{SSLEngine::closeInbound}} in > {{close()}} after flushing the {{close_notify}} TLS alert. > While this isn't a problem for the default JDK SSLEngine, it results in > resource leak in Netty/OpenSSL based SSLEngine which frees native resources > only when {{closeInbound}} is invoked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Default test name added to tools [kafka]
chia7712 commented on PR #15666: URL: https://github.com/apache/kafka/pull/15666#issuecomment-2039262579 @nizhikov thanks for this patch. let's wait for #15664 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Default test name added to tools [kafka]
nizhikov commented on PR #15666: URL: https://github.com/apache/kafka/pull/15666#issuecomment-2039274380 @chia7712 Can you, please, double check PR number? It sees #15664 not related to my changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Default test name added to tools [kafka]
chia7712 commented on PR #15666: URL: https://github.com/apache/kafka/pull/15666#issuecomment-2039276618 sorry :( the correct PR is #15663 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553223975 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -309,49 +300,49 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createStrictControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); +doNothing().when(stateManager).registerGlobalStateStores(emptyList()); Review Comment: No, it is already verified later on, removed ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -309,49 +300,49 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createStrictControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); +doNothing().when(stateManager).registerGlobalStateStores(emptyList()); -EasyMock.expect(stateManager.taskId()).andReturn(taskId); +when(stateManager.taskId()).thenReturn(taskId); Review Comment: Nope, I do not, it is already in the setup! ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -309,49 +300,49 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createStrictControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); +doNothing().when(stateManager).registerGlobalStateStores(emptyList()); -EasyMock.expect(stateManager.taskId()).andReturn(taskId); +
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553225208 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -309,49 +300,49 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createStrictControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); Review Comment: Nope, removed in a subsequent commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553224333 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -309,49 +300,49 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createStrictControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); +doNothing().when(stateManager).registerGlobalStateStores(emptyList()); -EasyMock.expect(stateManager.taskId()).andReturn(taskId); +when(stateManager.taskId()).thenReturn(taskId); -EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); +when(stateDirectory.lock(taskId)).thenReturn(true); -stateManager.close(); -EasyMock.expectLastCall(); +doNothing().when(stateManager).close(); // The `baseDir` will be accessed when attempting to delete the state store. - EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); -stateDirectory.unlock(taskId); -EasyMock.expectLastCall(); +doNothing().when(stateDirectory).unlock(taskId); Review Comment: No, it is already verified later on, removed ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -309,49 +300,49 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createStrictControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); +doNothing().when(stateManager).registerGlobalStateStores(emptyList()); -EasyMock.expect(stateManager.taskId()).andReturn(taskId); +when(stateManager.taskId()).thenReturn(taskId); -EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); +when(stateDirectory.lock(taskId)).thenReturn(true); -stateManager.close(); -EasyMock.expectLastCall(); +doNothing().when(stateManager).close(); // The `baseDir` will be accessed when attempting to delete the state store. - EasyMock.expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); -stateDirectory.unlock(taskId); -EasyMock.expectLastCall(); +doNothing().when(stateDirectory).unlock(taskId); -ctrl.checkOrder(true); -ctrl.replay(); +final InOrder inOrder = inOrder(stateManager, stateDirectory); task = createStatefulTask(createConfig(StreamsConf
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553235267 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1568,10 +1548,8 @@ public void shouldNotShareHeadersBetweenPunctuateIterations() { @Test public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() { - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(emptyMap()).anyTimes(); - EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); -EasyMock.replay(stateManager, recordCollector); +when(stateManager.changelogOffsets()).thenReturn(emptyMap()); +when(recordCollector.offsets()).thenReturn(emptyMap()); Review Comment: This is also absolutely correct, these are not needed. I have removed all stub-related references to emptyMap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553245804 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -365,11 +356,10 @@ public void shouldResetOffsetsToLastCommittedForSpecifiedPartitions() { consumer.seek(partition1, 10L); consumer.seek(partition2, 15L); +@SuppressWarnings("unchecked") final java.util.function.Consumer> resetter = -EasyMock.mock(java.util.function.Consumer.class); -resetter.accept(Collections.emptySet()); -EasyMock.expectLastCall(); -EasyMock.replay(resetter); +mock(java.util.function.Consumer.class); +doNothing().when(resetter).accept(Collections.emptySet()); Review Comment: This is fair, I have added a verification instead since it was expected in EasyMock. If you don't think the verification is necessary I can remove it completely -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553284545 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -514,8 +496,6 @@ public void shouldTransitToRestoringThenRunningAfterCreation() throws IOExceptio assertEquals(RUNNING, task.state()); assertTrue(source1.initialized); assertTrue(source2.initialized); - -EasyMock.verify(stateDirectory); Review Comment: Yeah, this is where I should have been more explicit: ``` EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)); ``` is reported as an unnecessary stub and ``` stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); ``` if verified is reported as not called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553287165 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1798,8 +1746,6 @@ public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() { assertThrows(RuntimeException.class, () -> task.suspend()); task.closeDirty(); - -EasyMock.verify(stateManager); Review Comment: I have moved ``` doNothing().when(stateManager).close(); ``` to ``` verify(stateManager).close(); ``` in subsequent commits. Everything else is related to empty maps or sets, so I have removed the stubbings -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-2039360963 Hey Chris, sorry for the long delay on this. I finally got a chance to verify the code that you provided and it makes sense. I agree that so far I was only thinking about either having 2 separate futures such that one waits for the other or trying to chain futures like CompletableFutures. However, the version you have provided is pretty straight forward and all the new tests passed OOB for me. Regarding > I think the only question left is whether out-of-order writes are possible because of how things are chained I am assuming that for non-exactly-once source tasks, you are referring to scenarios when offset flushes are triggered and when flush operations finish out of order. I reviewed the code and I can see that this is being checked in `handleFinishWrite` which does not complete the flush in case the currently completed flush isn't the current one. For any other erroneous cases, `cancelFlush` is being invoked ( as you mentioned). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553289186 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1866,9 +1810,6 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { @Test public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() { - EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(partition1)); Review Comment: Reported as unnecessary by Mockito -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16471) SslTransportLayer may leak SSLEngine resources
[ https://issues.apache.org/jira/browse/KAFKA-16471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834229#comment-17834229 ] Chia-Ping Tsai commented on KAFKA-16471: {quote} perhaps we backport KAFKA-16305 to 3.7 and leave out 3.6 for now? {quote} yep, please backport KAFKA-16305 to 3.7. And you are right, we can leave out 3.6 and let 3.6.3 (if users require it) RM (if there is a volunteer) to cherry-pick in the future. > SslTransportLayer may leak SSLEngine resources > -- > > Key: KAFKA-16471 > URL: https://issues.apache.org/jira/browse/KAFKA-16471 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 3.7.0, 3.6.1 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > {{SslTransportLayer}} does not invoke {{SSLEngine::closeInbound}} in > {{close()}} after flushing the {{close_notify}} TLS alert. > While this isn't a problem for the default JDK SSLEngine, it results in > resource leak in Netty/OpenSSL based SSLEngine which frees native resources > only when {{closeInbound}} is invoked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16305) Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
[ https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reopened KAFKA-16305: reopen for backport to 3.7 > Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake > -- > > Key: KAFKA-16305 > URL: https://issues.apache.org/jira/browse/KAFKA-16305 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0 > > > Kafka allows users to configure custom {{SSLEngine}} via the > {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL > based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation > for performance reasons. > While trying to use a Netty/Openssl based SSLEngine, we observe that the > server hangs while performing the TLS handshake. We observe the following > logs > {code} > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] > Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] > XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, > netWriteBuffer pos 0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId > 127.0.0.1:60045-127.0.0.1:60046-0 doRead true > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId > 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status > BUFFER_UNDERFLOW read 0 > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW > HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, > appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 >
[jira] [Resolved] (KAFKA-16305) Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake
[ https://issues.apache.org/jira/browse/KAFKA-16305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-16305. Fix Version/s: 3.7.1 Resolution: Fixed push https://github.com/apache/kafka/commit/633d2f139c403cbbe2912d04f823d74c561dab76 to 3.7 branch > Optimisation in SslTransportLayer:handshakeUnwrap stalls TLS handshake > -- > > Key: KAFKA-16305 > URL: https://issues.apache.org/jira/browse/KAFKA-16305 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0, 3.6.1 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > Kafka allows users to configure custom {{SSLEngine}} via the > {{SslEngineFactory}} interface. There have been attempts to use an OpenSSL > based {{SSLEngine}} using {{netty-handler}} over the JDK based implementation > for performance reasons. > While trying to use a Netty/Openssl based SSLEngine, we observe that the > server hangs while performing the TLS handshake. We observe the following > logs > {code} > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [INFO ] > Selector - [SocketServer listenerType=ZK_BROKER, nodeId=101] > XX-Gaurav: calling prepare channelId 127.0.0.1:60045-127.0.0.1:60046-0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state NOT_INITIALIZED > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] XX-Gaurav ready isReady false channelId > 127.0.0.1:60045-127.0.0.1:60046-0 state HANDSHAKE > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, appReadBuffer pos 0, netReadBuffer pos 0, > netWriteBuffer pos 0 > 2024-02-26 01:40:00,117 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake handshakeUnwrap channelId > 127.0.0.1:60045-127.0.0.1:60046-0 doRead true > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post handshakeUnwrap: channelId > 127.0.0.1:60045-127.0.0.1:60046-0 handshakeStatus NEED_UNWRAP status > BUFFER_UNDERFLOW read 0 > 2024-02-26 01:40:00,118 > data-plane-kafka-network-thread-101-ListenerName(TEST)-SASL_SSL-0 [TRACE] > SslTransportLayer- [SslTransportLayer > channelId=127.0.0.1:60045-127.0.0.1:60046-0 > key=channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:60045 > remote=/127.0.0.1:60046], selector=sun.nio.ch.KQueueSelectorImpl@18d201be, > interestOps=1, readyOps=0] SSLHandshake post unwrap NEED_UNWRAP channelId > 127.0.0.1:60045-127.0.0.1:60046-0, handshakeResult Status = BUFFER_UNDERFLOW > Hands
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553347186 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -312,55 +302,40 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldNotAttemptToLockIfNoStores() { -stateDirectory = EasyMock.createNiceMock(StateDirectory.class); -EasyMock.replay(stateDirectory); +stateDirectory = mock(StateDirectory.class); task = createStatelessTask(createConfig("100")); task.initializeIfNeeded(); // should fail if lock is called -EasyMock.verify(stateDirectory); +verify(stateDirectory, never()).lock(any()); } @Test -public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() throws IOException { -final IMocksControl ctrl = EasyMock.createNiceControl(); -final ProcessorStateManager stateManager = ctrl.createMock(ProcessorStateManager.class); - EasyMock.expect(stateManager.taskType()).andStubReturn(TaskType.ACTIVE); -stateDirectory = ctrl.createMock(StateDirectory.class); +public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() { +when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); +stateDirectory = mock(StateDirectory.class); -stateManager.registerGlobalStateStores(emptyList()); -EasyMock.expectLastCall(); - -EasyMock.expect(stateManager.taskId()).andReturn(taskId); - -EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); - -stateManager.close(); -EasyMock.expectLastCall(); - -stateManager.transitionTaskState(SUSPENDED); Review Comment: When I moved these to verifications Mockito claimed they were never called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1553349919 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ## @@ -1652,81 +1598,64 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException { assertTrue(source1.initialized); assertTrue(source2.initialized); -EasyMock.verify(stateManager, recordCollector); - -EasyMock.reset(recordCollector); -EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()); -EasyMock.replay(recordCollector); assertThat("Map did not contain the partition", task.highWaterMark().containsKey(partition1)); + +verify(recordCollector).offsets(); } @Test public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { final Long offset = 543L; - EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes(); -stateManager.checkpoint(); -EasyMock.expectLastCall().once(); -EasyMock.expect(stateManager.changelogOffsets()) -.andReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint -.andReturn(singletonMap(changelogPartition, 10L)) -.andReturn(singletonMap(changelogPartition, 20L)); -EasyMock.expectLastCall(); -EasyMock.replay(stateManager, recordCollector); + when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); +when(stateManager.changelogOffsets()) +.thenReturn(singletonMap(changelogPartition, 10L)) // restoration checkpoint +.thenReturn(singletonMap(changelogPartition, 10L)) +.thenReturn(singletonMap(changelogPartition, 20L)); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded(); -task.completeRestoration(noOpResetter -> { }); +task.completeRestoration(noOpResetter -> { }); // should checkpoint task.prepareCommit(); -task.postCommit(true); // should checkpoint +task.postCommit(true); // should checkpoint task.prepareCommit(); -task.postCommit(false); // should not checkpoint +task.postCommit(false); // should not checkpoint -EasyMock.verify(stateManager, recordCollector); assertThat("Map was empty", task.highWaterMark().size() == 2); + +verify(stateManager, times(2)).checkpoint(); } @Test public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() { final Long offset = 543L; - EasyMock.expect(recordCollector.offsets()).andReturn(singletonMap(changelogPartition, offset)).anyTimes(); -stateManager.checkpoint(); -EasyMock.expectLastCall().times(2); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)); -EasyMock.expect(stateManager.changelogOffsets()) -.andReturn(singletonMap(changelogPartition, 0L)) -.andReturn(singletonMap(changelogPartition, 10L)) -.andReturn(singletonMap(changelogPartition, 12000L)); -stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); -EasyMock.expectLastCall(); -EasyMock.replay(stateManager, recordCollector); + when(recordCollector.offsets()).thenReturn(singletonMap(changelogPartition, offset)); +when(stateManager.changelogOffsets()) +.thenReturn(singletonMap(changelogPartition, 0L)) +.thenReturn(singletonMap(changelogPartition, 10L)) +.thenReturn(singletonMap(changelogPartition, 12000L)); task = createStatefulTask(createConfig("100"), true); task.initializeIfNeeded(); -task.completeRestoration(noOpResetter -> { }); +task.completeRestoration(noOpResetter -> { }); // should checkpoint task.prepareCommit(); -task.postCommit(true); +task.postCommit(true); // should checkpoint task.prepareCommit(); -task.postCommit(false); +task.postCommit(false); // should checkpoint since the offset delta is greater than the threshold -EasyMock.verify(recordCollector); assertThat("Map was empty", task.highWaterMark().size() == 2); + +verify(stateManager, times(3)).checkpoint(); } @Test public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() { - EasyMock.expect(stateManager.changelogPartitions()).andReturn(singleton(changelogPartition)); -stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); Review Comment: According to Mockito neither of these were called -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To uns
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on PR #14716: URL: https://github.com/apache/kafka/pull/14716#issuecomment-2039443091 Heya @cadonna! I have rebased and hopefully addressed all of the first batch of comments. The verifications which are missing are reported as unnecessary/uncalled by Mockito, but if you think they are necessary I will circle back to check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
clolov commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1553357069 ## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ## @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { */ public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; +public static final long LATEST_TIERED_TIMESTAMP = -5L; Review Comment: Yup, thanks a lot for bringing this up in the mailing list and here, I will open a pull request to amend this miss! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16477) Detect thread leaked client-metrics-reaper in tests
Kuan Po Tseng created KAFKA-16477: - Summary: Detect thread leaked client-metrics-reaper in tests Key: KAFKA-16477 URL: https://issues.apache.org/jira/browse/KAFKA-16477 Project: Kafka Issue Type: Improvement Reporter: Kuan Po Tseng Assignee: Kuan Po Tseng After profiling the kafka tests, tons of `client-metrics-reaper` thread not cleanup after BrokerServer shutdown. The thread {{client-metrics-reaper}} comes from [ClientMetricsManager#expirationTimer|https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115], and BrokerServer#shudown doesn't close ClientMetricsManager which let the timer thread still runs in background. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on PR #15335: URL: https://github.com/apache/kafka/pull/15335#issuecomment-2039474182 > @OmniaGM , there is compilation error in jdk8_scala2.12 job. Could you have a look? > > ``` > [2024-04-04T09:19:51.266Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4952:125: recursive value leaderTopicsDelta needs type > [2024-04-04T09:19:51.266Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4953:17: value makeLeader is not a member of Any > [2024-04-04T09:19:51.266Z] [Error] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15335/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:4957:7: overloaded method value assertTrue with alternatives: > [2024-04-04T09:19:51.266Z] (x$1: java.util.function.BooleanSupplier)Unit > [2024-04-04T09:19:51.266Z] (x$1: Boolean)Unit > [2024-04-04T09:19:51.266Z] cannot be applied to (Any) > ``` > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15335/13/pipeline Fixed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
brandboat opened a new pull request, #15668: URL: https://github.com/apache/kafka/pull/15668 related to KAFKA-16477, After profiling the kafka tests, tons of `client-metrics-reaper` thread not cleanup after BrokerServer shutdown. The thread `client-metrics-reaper` comes from [ClientMetricsManager#expirationTimer](https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java#L115), and BrokerServer#shudown doesn't close ClientMetricsManager which let the thread still runs in background. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
chia7712 commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1552954260 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition, object Partition { private val metricsGroup = new KafkaMetricsGroup(classOf[Partition]) - def apply(topicPartition: TopicPartition, + def apply(topicIdPartition: TopicIdPartition, time: Time, replicaManager: ReplicaManager): Partition = { +Partition( Review Comment: not sure whether we need this new `apply`. No callers have `TopicIdPartition` and hence they have to create `TopicIdPartition` to use this `apply` ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, -alterIsrManager: AlterPartitionManager) extends Logging { +alterIsrManager: AlterPartitionManager, +@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code Review Comment: Can we add jira link to the comment? The reader can trace the updates easily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1553428780 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -104,9 +104,19 @@ class DelayedOperations(topicPartition: TopicPartition, object Partition { private val metricsGroup = new KafkaMetricsGroup(classOf[Partition]) - def apply(topicPartition: TopicPartition, + def apply(topicIdPartition: TopicIdPartition, time: Time, replicaManager: ReplicaManager): Partition = { +Partition( Review Comment: The plan is to use this apply in [KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) which will be raised soon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1553431953 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, -alterIsrManager: AlterPartitionManager) extends Logging { +alterIsrManager: AlterPartitionManager, +@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code Review Comment: there is a jira that will be address this already which is [KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) I'll update the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
OmniaGM commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1553431953 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -289,10 +300,11 @@ class Partition(val topicPartition: TopicPartition, delayedOperations: DelayedOperations, metadataCache: MetadataCache, logManager: LogManager, -alterIsrManager: AlterPartitionManager) extends Logging { +alterIsrManager: AlterPartitionManager, +@volatile private var _topicId: Option[Uuid] = None // TODO: merge topicPartition and _topicId into TopicIdPartition once TopicId persist in most of the code Review Comment: there is a jira that will address this already which is [KAFKA-16212](https://issues.apache.org/jira/browse/KAFKA-16212) I'll update the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16477: Detect thread leaked client-metrics-reaper in tests [kafka]
chia7712 commented on PR #15668: URL: https://github.com/apache/kafka/pull/15668#issuecomment-2039540752 @brandboat nice finding! Should we add the thread prefix to https://github.com/apache/kafka/blob/a2ee0855ee5e73f3a74555d52294bb4acfd28945/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L2487 to avoid similar issue in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16475) Create unit test for TopicImageNode
[ https://issues.apache.org/jira/browse/KAFKA-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu reassigned KAFKA-16475: -- Assignee: Johnny Hsu > Create unit test for TopicImageNode > --- > > Key: KAFKA-16475 > URL: https://issues.apache.org/jira/browse/KAFKA-16475 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Johnny Hsu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16475) Create unit test for TopicImageNode
[ https://issues.apache.org/jira/browse/KAFKA-16475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834261#comment-17834261 ] Johnny Hsu commented on KAFKA-16475: hi [~cmccabe] I am willing to work on this ticket, thanks! > Create unit test for TopicImageNode > --- > > Key: KAFKA-16475 > URL: https://issues.apache.org/jira/browse/KAFKA-16475 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: remove redundant check in appendLegacyRecord [kafka]
chia7712 merged PR #15638: URL: https://github.com/apache/kafka/pull/15638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-8735: Check properties file existence first [kafka]
qinghui-xu commented on PR #7139: URL: https://github.com/apache/kafka/pull/7139#issuecomment-2039564348 For me this should be merged even just for the sake of the codebase sanity. I can try to rebase it and resolve the conflict to update the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553477271 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -248,9 +249,10 @@ object KafkaConfig { val ConsumerGroupMaxSessionTimeoutMsProp = "group.consumer.max.session.timeout.ms" val ConsumerGroupHeartbeatIntervalMsProp = "group.consumer.heartbeat.interval.ms" val ConsumerGroupMinHeartbeatIntervalMsProp = "group.consumer.min.heartbeat.interval.ms" - val ConsumerGroupMaxHeartbeatIntervalMsProp ="group.consumer.max.heartbeat.interval.ms" + val ConsumerGroupMaxHeartbeatIntervalMsProp = "group.consumer.max.heartbeat.interval.ms" val ConsumerGroupMaxSizeProp = "group.consumer.max.size" val ConsumerGroupAssignorsProp = "group.consumer.assignors" + val ConsumerGroupMigrationPolicyProp = "consumer.group.migration.policy" Review Comment: Sorry, I missed this one: `consumer.group` -> `group.consumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553480909 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -677,6 +679,7 @@ object KafkaConfig { val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval for registered consumers." val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single consumer group can accommodate." val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor." + val ConsumerGroupMigrationPolicyDoc = "The config that enables converting the classic group using the consumer embedded protocol to the consumer group using the consumer group protocol and vice versa. " + ConsumerGroupMigrationPolicy.validValuesDescription Review Comment: Thanks. I would rather prefer to keep all the documentation defined here. This is what we usually do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16294: Add group protocol migration enabling config [kafka]
dajac commented on code in PR #15411: URL: https://github.com/apache/kafka/pull/15411#discussion_r1553481594 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String policy; + +ConsumerGroupMigrationPolicy(String config) { +this.policy = config; +} Review Comment: nit: We use different names `config` and `policy`. This is confusing. How about using `name`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { +/** Both upgrade and downgrade are enabled.*/ +BIDIRECTIONAL("bidirectional"), + +/** Only upgrade is enabled.*/ +UPGRADE("upgrade"), + +/** Only downgrade is enabled.*/ +DOWNGRADE("downgrade"), + +/** Neither upgrade nor downgrade is enabled.*/ +DISABLED("disabled"); + +private final String policy; + +ConsumerGroupMigrationPolicy(String config) { +this.policy = config; +} + +@Override +public String toString() { +return policy; +} + +public static String validValuesDescription = +BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled" + ", " + +UPGRADE + ": only upgrade is enabled" + ", " + +DOWNGRADE + ": only downgrade is enabled" + ", " + Review Comment: nit: Should we complement with the from...to... like you side for BIDIRECTIONAL? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new Properties() +props.putAll(kraftProps()) + +// Invalid GroupProtocolMigrationPolicy value. +props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, "foo") +assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props)) + +ConsumerGroupMigrationPolicy.values().foreach { policy => + props.put(KafkaConfig.ConsumerGroupMigrationPolicyProp, policy.toString) Review Comment: Is it case sensitive? ## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ## @@ -1831,6 +1832,22 @@ class KafkaConfigTest { assertTrue(config.isNewGroupCoordinatorEnabled) } + @Test + def testGroupProtocolMigrationPolicy(): Unit = { +val props = new Proper
Re: [PR] MINOR: Add type check to classic group timeout operations [kafka]
dajac commented on code in PR #15587: URL: https://github.com/apache/kafka/pull/15587#discussion_r1553499653 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); Review Comment: I am not a fan of this pattern because you effectively have to look up the group twice. One option would be to use a try..catch to catch the exception thrown by getOrMaybeCreateClassicGroup. Another option would be to 1) do the lookup, 2) verify non-null and group type and return if it fails. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2415,6 +2415,20 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } +/** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ +private CoordinatorResult completeClassicGroupJoin(String groupId) { +if (containsClassicGroup(groupId)) { +return completeClassicGroupJoin(getOrMaybeCreateClassicGroup(groupId, false)); +} else { +log.info("Group {} is null or not a classic group, skipping rebalance stage.", groupId); Review Comment: I wonder if we could use `debug` here. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2805,31 +2826,36 @@ private CoordinatorResult maybeCompleteJoinElseSchedule( * Try to complete the join phase of the initial rebalance. * Otherwise, extend the rebalance. * - * @param group The group under initial rebalance. + * @param groupId The group under initial rebalance. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult tryCompleteInitialRebalanceElseSchedule( -ClassicGroup group, +String groupId, int delayMs, int remainingMs ) { -if (group.newMemberAdded() && remainingMs != 0) { -// A new member was added. Extend the delay. -group.setNewMemberAdded(false); -int newDelayMs = Math.min(classicGroupInitialRebalanceDelayMs, remainingMs); -int newRemainingMs = Math.max(remainingMs - delayMs, 0); - -timer.schedule( -classicGroupJoinKey(group.groupId()), -newDelayMs, -TimeUnit.MILLISECONDS, -false, -() -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs) -); +if (containsClassicGroup(groupId)) { Review Comment: ditto. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2533,45 +2547,52 @@ private void schedulePendingSync(ClassicGroup group) { group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, -() -> expirePendingSync(group, group.generationId())); +() -> expirePendingSync(group.groupId(), group.generationId())); } /** * Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and * try complete the join phase. * - * @param group The group. + * @param groupId The group id. * @param memberId The member id. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult expireClassicGroupMemberHeartbeat( -ClassicGroup group, +String groupId, String memberId ) { -if (group.isInState(DEAD)) { -log.info("Received notification of heartbeat expiration for member {} after group {} " + -"had already been unloaded or deleted.", -memberId, group.groupId()); -} else if (group.isPendingMember(memberId)) { -log.info("Pending member {} in group {} has been removed after session timeout expiration.", -memberId, group.groupId()); - -return removePendingMemberAndUpdateClassicGroup(group, memberId
Re: [PR] KAFKA-16383: fix flaky IdentityReplicationIntegrationTest .testReplicateFromLatest [kafka]
johnnychhsu commented on PR #15556: URL: https://github.com/apache/kafka/pull/15556#issuecomment-2039670307 @vamossagar12 thanks for the comment. sure! let's wait and monitor more builds -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1553514811 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && +!classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && Review Comment: I think that we have `usesConsumerGroupProtocol()` in the `ClassicGroup` class. Could we use it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { Review Comment: Does it have to be public? Should we add some javadoc? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && +!classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && +classicGroup.size() <= consumerGroupMaxSize; +} + +ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List records) { +classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); +classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + +createGroupTombstoneRecords(classicGroup, records); +ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, classicGroup.groupId(), metrics); +classicGroup.convertToConsumerGroup(consumerGroup, records, metadataImage.topics()); Review Comment: I was wondering whether it would make more sense the other way around and have something like `ConsumerGroup.fromClassicGroup()`. I guess that it does not really matter in the end. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } +public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { +return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && +!classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && +classicGroup.size() <= consumerGroupMaxSize; +} Review Comment: I wonder whether we should log something (with the reason) when the upgrade is disallowed. Have you considered it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ## @@ -1300,6 +1341,68 @@ public Map groupAssignment() { )); } +/** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ +public void convertToConsumerGroup( +ConsumerGroup consumerGroup, +List records, +TopicsImage topicsImage +) throws GroupIdNotFoundException { +consumerGroup.setGroupEpoch(generationId); +consumerGroup.setTargetAssignmentEpoch(generationId); + +records.add(RecordHelpers.newGroupEpochRecord(groupId(), generationId)); +// SubscriptionMetadata will be computed in the following consumerGroupHeartbeat + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); +records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), generationId)); + +members.forEach((memberId, member) -> { +ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); +Map> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); +ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata())); Review Comment: * `deserializeAssignment` and `deserializeSubscription` could throw an `SchemaException` if not mistaken if the bytes are incorrect. We should handle those, I suppose. * We also discussed offline the need to k
[jira] [Created] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
Philipp Trulson created KAFKA-16478: --- Summary: Links for Kafka 3.5.2 release are broken Key: KAFKA-16478 URL: https://issues.apache.org/jira/browse/KAFKA-16478 Project: Kafka Issue Type: Bug Components: website Affects Versions: 3.5.2 Reporter: Philipp Trulson While trying to update our setup, I noticed that the download links for the 3.5.2 links are broken. They all point to a different host and also contain an additional `/kafka` in their URL. Compare: not working: [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] working: [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
viktorsomogyi merged PR #15605: URL: https://github.com/apache/kafka/pull/15605 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
FrankYang0529 commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2039753742 > > I think the class file size increasing is indeed a direct drawback after adding -parameter option because we'll include all the parameters into .class files. I'd like to know if there's any other way to fix this? Could we use ARGUMENTS instead of ARGUMENTS_WITH_NAMES? > > Or we can add the new arg to compileTestJava only to avoid impacting production binary Updated it as `compileTestJava.options.compilerArgs.add "-parameters"`. Thanks for the suggestion @chia7712 @showuon . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
viktorsomogyi commented on PR #15605: URL: https://github.com/apache/kafka/pull/15605#issuecomment-2039754554 Aslo, thank you @akatona84 for the contribution, @soarez and @urbandan for the reviews. Fixing flaky tests is always very welcomed, keep it up! 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.6.2 to system tests [kafka]
omkreddy commented on PR #15665: URL: https://github.com/apache/kafka/pull/15665#issuecomment-2039898497 Thanks for the Review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add 3.6.2 to system tests [kafka]
omkreddy merged PR #15665: URL: https://github.com/apache/kafka/pull/15665 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Increase parallelism for Jenkins [kafka]
divijvaidya closed pull request #15099: MINOR: Increase parallelism for Jenkins URL: https://github.com/apache/kafka/pull/15099 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm opened a new pull request, #15669: URL: https://github.com/apache/kafka/pull/15669 Minor changes for improving the logging and docs related to the auto-commit inflight logic, also adding tests to ensure the expected behaviour: - auto-commit on the interval does not send a request if another one inflight, and it sends the next as soon as a response is received (without waiting for the full interval again) - auto-commit before revocation always send a request (even if another one from auto-commit on interval is in-flight), to ensure the latest is committed before revoking partitions. No changes in logic, just adding tests, docs and minor refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2039970916 Hey @lucasbru, could you take a look at this when you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: nit: maybe helpful to reflect in the name that this does execute the callbacks (or leave it as it is and then have the one line that does execute the callbacks here, right after) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1553899936 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1360,6 +1362,9 @@ public void commitSync(Map offsets, Duration Timer requestTimer = time.timer(timeout.toMillis()); SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets, requestTimer); CompletableFuture commitFuture = commit(syncCommitEvent); + +awaitPendingAsyncCommits(requestTimer, false); Review Comment: nit: maybe helpful to reflect in the name that this does execute the callbacks (or leave it as it is and then have the one line 1406 that does execute the callbacks here, right after) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it, right? This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543454136 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) { } private boolean invokePendingAsyncCommits(Timer timer) { -if (inFlightAsyncCommits.get() == 0) { +if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0) { Review Comment: This makes sense to me, to fill a gap in the case of commit sync with empty offsets, that skips the path of sending an actual request, and that's why it looses the guarantee of executing the callbacks as I see it. This makes the logic consistent with what happens if the commit sync has non-empty offsets. In that case, it does execute the callbacks for previous async commits that were waiting for coord: the sync commit would be blocked on the same findCoord request (there's always just 1), and the moment the coord is found the async is marked as inflight [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1036), so it would be considered for callbacks [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1121). Am I getting the reasoning for the change right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]
junrao commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1553959430 ## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ## @@ -47,6 +47,8 @@ public class ListOffsetsRequest extends AbstractRequest { */ public static final long EARLIEST_LOCAL_TIMESTAMP = -4L; +public static final long LATEST_TIERED_TIMESTAMP = -5L; Review Comment: Thanks for following up, @clolov ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15583) High watermark can only advance if ISR size is larger than min ISR
[ https://issues.apache.org/jira/browse/KAFKA-15583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Calvin Liu resolved KAFKA-15583. Resolution: Fixed > High watermark can only advance if ISR size is larger than min ISR > -- > > Key: KAFKA-15583 > URL: https://issues.apache.org/jira/browse/KAFKA-15583 > Project: Kafka > Issue Type: Sub-task >Reporter: Calvin Liu >Assignee: Calvin Liu >Priority: Major > > This is the new high watermark advancement requirement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16479) Add pagination supported describeTopic interface
Calvin Liu created KAFKA-16479: -- Summary: Add pagination supported describeTopic interface Key: KAFKA-16479 URL: https://issues.apache.org/jira/browse/KAFKA-16479 Project: Kafka Issue Type: Sub-task Reporter: Calvin Liu During the DescribeTopicPartitions API implementations, we found it awkward to place the pagination logic within the current admin client describe topic interface. So, in order to change the interface, we may need to have a boarder discussion like creating a KIP. Or even a step forward, to discuss a general client side pagination framework. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on PR #15470: URL: https://github.com/apache/kafka/pull/15470#issuecomment-2040217099 @mumrah Thanks for the review. Ticket filed. https://issues.apache.org/jira/browse/KAFKA-15579 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
junrao commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040231361 @chia7712 : Thanks for the updated PR. Regarding the previous failed tests, one possibility is that the data on the server passed the retention time and is garbage collected. The default retention time is 7 days, which should be long enough. However, since we reuse mockTime, if the test runs long, the retention time might still be reached. Perhaps we could set [log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms) to -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1543492385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -984,6 +984,8 @@ public void close(final Timer timer) { } } finally { super.close(timer); +// Super-class close may wait for more commit callbacks to complete. +invokeCompletedOffsetCommitCallbacks(); Review Comment: Agree, there could be async requests, with known coord, not getting a response within the above commit sync time, then getting it while the super.close waits, so we should trigger the callbacks at this point. But this makes me notice, aren't we breaking the `close(Duration)` contract here, calling that `super.close(timer)` on the finally clause? Let's say async requests that are not getting a response within the timeout in the above while loop (so we block for time on the while), then `finally`, the super class blocks for that time again [here](https://github.com/apache/kafka/blob/8b274d8c1bfbfa6d4319ded884a11da790d7bf77/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1139). Am I missing something? (I can file a separate Jira if I'm not missing something here) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a… [kafka]
chia7712 commented on PR #15621: URL: https://github.com/apache/kafka/pull/15621#issuecomment-2040270805 > Regarding the previous failed tests, one possibility is that the data on the server passed the retention time and is garbage collected. The default retention time is 7 days, which should be long enough. However, since we reuse mockTime, if the test runs long, the retention time might still be reached. Perhaps we could set [log.retention.ms](https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms) to -1? ``` org.opentest4j.AssertionFailedError: expected: <0> but was: <3> ``` You really hit the bullseye. I can reproduce the error by doing a little sleep before fetching data. Will set `retention.ms` to -1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554011665 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: We have a new `PlainTextConsumerCommitTest` for all commit-relates tests. These 2 should go there I would say. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554020736 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -229,7 +229,11 @@ private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstance @AfterEach public void teardown() { this.metrics.close(); -this.coordinator.close(time.timer(0)); +try { +this.coordinator.close(time.timer(0)); Review Comment: I see, I would say it's fine to throw the error at the coordinator level (and live with code like this). And actually, the need for this catch is not introduced by this PR as I see it. The coordinator close before this PR could throw fenced exception for async commits that were waiting for coord and completed [here](https://github.com/apache/kafka/blob/fd9c7d2932dee055289b403e37a0bbb631c080a9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L983) getting fenced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024079 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); Review Comment: nit: unneeded semi-colon. Java to scala jump tricking us...been there :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554024796 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); Review Comment: nit: semi-colon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554025678 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) + +// Try with coordinator known +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava, cb) +consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(2L))).asJava) +assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(2, cb.successCount); + +// Try with empty sync commit +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset) +assertEquals(3, cb.successCount); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16389: ConsumerEventHandler does not support incremental assignment changes causing failure in system test [kafka]
philipnee commented on PR #15661: URL: https://github.com/apache/kafka/pull/15661#issuecomment-2040290159 @cadonna @lucasbru - Is it possible for me to ask for a review on this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16362: Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide [kafka]
gharris1727 commented on code in PR #15601: URL: https://github.com/apache/kafka/pull/15601#discussion_r1554016667 ## streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java: ## @@ -33,28 +34,36 @@ public class TimestampedKeyAndJoinSide { private final K key; private final long timestamp; -private final boolean leftSide; +private final JoinSide joinSide; -private TimestampedKeyAndJoinSide(final boolean leftSide, final K key, final long timestamp) { +private TimestampedKeyAndJoinSide(final JoinSide joinSide, final K key, final long timestamp) { this.key = Objects.requireNonNull(key, "key cannot be null"); -this.leftSide = leftSide; +this.joinSide = joinSide; this.timestamp = timestamp; } /** - * Create a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null}. + * Create a new {@link TimestampedKeyAndJoinSide} instance if the provided {@code key} is not {@code null}. * - * @param leftSide True if the key is part of the left join side; False if it is from the right join side + * @param joinSide Whether the key is part of the {@link JoinSide#LEFT} side; or it is from the {@link JoinSide#RIGHT} side * @param key the key * @param the type of the key - * @return a new {@link TimestampedKeyAndJoinSide} instance if the provide {@code key} is not {@code null} + * @return a new {@link TimestampedKeyAndJoinSide} instance if the provided {@code key} is not {@code null} */ -public static TimestampedKeyAndJoinSide make(final boolean leftSide, final K key, final long timestamp) { -return new TimestampedKeyAndJoinSide<>(leftSide, key, timestamp); +public static TimestampedKeyAndJoinSide make(final JoinSide joinSide, final K key, final long timestamp) { Review Comment: Since this is only used in tests now, I think you can remove this and replace the test call-sites with the new functions. Make sure to copy the javadoc to the new signatures too. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoin.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.util.Optional; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ValueJoinerWithKey; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KStreamKStreamLeftJoin extends KStreamKStreamJoin { +private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class); + +KStreamKStreamLeftJoin(final String otherWindowName, +final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, +final boolean outer, +final Optional outerJoinWindowName, +final TimeTrackerSupplier sharedTimeTrackerSupplier) { +super(otherWindowName, windows, windows.beforeMs, windows.afterMs, joiner, outerJoinWindowName, +sharedTimeTrackerSupplier, outer); +} + +@Override +public Processor get() { +return new KStreamKStreamLeftJoinProcessor(); +} + +private class KStreamKStreamLeftJoinProcessor extends KStreamKStreamJoinProcessor { +@Override +public void process(final Record leftRecord) { +final long inputRecordTimestamp = leftRecord.timestamp(); +final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); +final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); +sh
Re: [PR] KAFKA-12848: kafka streams jmh benchmarks [kafka]
vamossagar12 closed pull request #10842: KAFKA-12848: kafka streams jmh benchmarks URL: https://github.com/apache/kafka/pull/10842 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15853: Move transactions configs out of core [kafka]
OmniaGM opened a new pull request, #15670: URL: https://github.com/apache/kafka/pull/15670 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown [kafka]
vamossagar12 closed pull request #10468: Kafka 12373:Improve KafkaRaftClient handling of graceful shutdown URL: https://github.com/apache/kafka/pull/10468 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10526: leader fsync deferral on write [kafka]
vamossagar12 closed pull request #10278: KAFKA-10526: leader fsync deferral on write URL: https://github.com/apache/kafka/pull/10278 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13501: Avoid state restore via rebalance if standbys are enabled [kafka]
vamossagar12 closed pull request #11592: KAFKA-13501: Avoid state restore via rebalance if standbys are enabled URL: https://github.com/apache/kafka/pull/11592 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16472: Fix integration tests in Java with parameter name [kafka]
chia7712 commented on PR #15663: URL: https://github.com/apache/kafka/pull/15663#issuecomment-2040402625 @ijuma Could you please take a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improvements to release.py [kafka]
chia7712 commented on code in PR #15651: URL: https://github.com/apache/kafka/pull/15651#discussion_r1554150631 ## release.py: ## @@ -348,6 +348,9 @@ def command_release_announcement_email(): +An overview of the release and its notable changes can be found in the +release blog post: Review Comment: Can we generate the link automatically if the link formate is like `https://kafka.apache.org/blog#apache_kafka_362_release_announcement` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs a commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs an actual commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554165141 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { Review Comment: nit: maybe better name testCommitAsyncCompleted**Before**ConsumerCloses (clearer and consistent with the similar one below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554177925 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(1L))).asJava, cb) +consumer.close() +assertEquals(2, cb.successCount); + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commits sent previously with the +// `commitAsync` are guaranteed to have their callbacks invoked prior to completion of +// `commitSync` (given that it does not time out). +val producer = createProducer() +sendRecords(producer, numRecords = 3, tp) +sendRecords(producer, numRecords = 3, tp2) + +val consumer = createConsumer() +consumer.assign(List(tp, tp2).asJava) + +// Try without looking up the coordinator first +val cb = new CountConsumerCommitCallback +consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(1L))).asJava, cb) +consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava) +assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(1, cb.successCount); + +// Enforce looking up the coordinator +consumer.committed(Set(tp, tp2).asJava) Review Comment: I would say we don't need this, because of the successful `assertEquals` with call to `committed` above, ln 694. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on code in PR #15613: URL: https://github.com/apache/kafka/pull/15613#discussion_r1554162586 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol: String): Unit = { +// This is testing the contract that asynchronous offset commit are completed before the consumer +// is closed. Review Comment: I would extend with something like : ...before the consumer is closed, **even when no commit sync is performed as part of the close (due to auto-commit disabled, or simply because there no consumed offsets).** That's the key as I see it, fixed in this PR, and being tested here. If the call to consumer.close performs an actual commit sync (needs auto-commit enabled and non-empty consumed offsets), then the async callbacks were always called I expect. The contract was not being respected in case the commit sync did not happen for some of the reasons mentioned above. Agree? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16068: Use TestPlugins in ConnectorValidationIntegrationTest to silence plugin scanning errors [kafka]
gharris1727 commented on code in PR #15642: URL: https://github.com/apache/kafka/pull/15642#discussion_r1554188423 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java: ## @@ -69,6 +69,9 @@ public static void setup() { Map workerProps = new HashMap<>(); workerProps.put(GROUP_ID_CONFIG, WORKER_GROUP_ID); +// Work around a circular-dependency in TestPlugins. +TestPlugins.pluginPath(); Review Comment: The circular dependency is still a problem that I haven't resolved, sorry. It's fixed in this un-merged PR: #1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lianetm commented on PR #15613: URL: https://github.com/apache/kafka/pull/15613#issuecomment-2040495611 Thanks for the changes @lucasbru, looks good to me overall. This is tidying up the whole async commit callbacks execution story. Left some comments, mostly minor, and to make sure we're on the same page with the reasoning behind the change. Should we update the PR description to refer not only to the `consumer.commitSync()`, but also `consumer.close()`, both being fixed here to ensure that previous async commit callbacks are always executed? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16207; KRaft's internal log listener to update voter set [kafka]
jsancio opened a new pull request, #15671: URL: https://github.com/apache/kafka/pull/15671 DRAFT ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix javadoc warnings [kafka]
gharris1727 commented on PR #15527: URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040586001 Hi @gaurav-narula I see some other javadoc warnings, do you think we should address these? ``` > Task :storage:storage-api:javadoc kafka/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java:31: warning: reference not found: kafka.api.MetricsTest * @see kafka.api.MetricsTest ^ 1 warning > Task :streams:javadoc kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:52: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore ReadOnlyKeyValueStore>}. ^ kafka/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java:74: warning: invalid input: '<' * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore ReadOnlyWindowStore>}. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move transactions configs out of core [kafka]
chia7712 commented on code in PR #15670: URL: https://github.com/apache/kafka/pull/15670#discussion_r1554282432 ## core/src/main/scala/kafka/server/DynamicBrokerConfig.scala: ## @@ -1136,18 +1137,18 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) { - info(s"Reconfigure ${KafkaConfig.ProducerIdExpirationMsProp} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") + info(s"Reconfigure ${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}") producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs) } if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) { - info(s"Reconfigure ${KafkaConfig.TransactionPartitionVerificationEnableProp} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") + info(s"Reconfigure ${TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}") producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable) } } def validateReconfiguration(newConfig: KafkaConfig): Unit = { if (newConfig.producerIdExpirationMs < 0) - throw new ConfigException(s"${KafkaConfig.ProducerIdExpirationMsProp} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") + throw new ConfigException(s"${TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}") Review Comment: maybe `ProducerStateManagerConfig.PRODUCER_ID_EXPIRATION_MS` is more suitable in this case because the code is used for logging `Producer` state. ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1062,21 +1027,21 @@ object KafkaConfig { .define(CompressionTypeProp, STRING, LogConfig.DEFAULT_COMPRESSION_TYPE, in(BrokerCompressionType.names.asScala.toSeq:_*), HIGH, CompressionTypeDoc) /** * Transaction management configuration ***/ - .define(TransactionalIdExpirationMsProp, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionalIdExpirationMsDoc) - .define(TransactionsMaxTimeoutMsProp, INT, Defaults.TRANSACTIONS_MAX_TIMEOUT_MS, atLeast(1), HIGH, TransactionsMaxTimeoutMsDoc) - .define(TransactionsTopicMinISRProp, INT, Defaults.TRANSACTIONS_TOPIC_MIN_ISR, atLeast(1), HIGH, TransactionsTopicMinISRDoc) - .define(TransactionsLoadBufferSizeProp, INT, Defaults.TRANSACTIONS_LOAD_BUFFER_SIZE, atLeast(1), HIGH, TransactionsLoadBufferSizeDoc) - .define(TransactionsTopicReplicationFactorProp, SHORT, Defaults.TRANSACTIONS_TOPIC_REPLICATION_FACTOR, atLeast(1), HIGH, TransactionsTopicReplicationFactorDoc) - .define(TransactionsTopicPartitionsProp, INT, Defaults.TRANSACTIONS_TOPIC_PARTITIONS, atLeast(1), HIGH, TransactionsTopicPartitionsDoc) - .define(TransactionsTopicSegmentBytesProp, INT, Defaults.TRANSACTIONS_TOPIC_SEGMENT_BYTES, atLeast(1), HIGH, TransactionsTopicSegmentBytesDoc) - .define(TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_ABORT_TIMED_OUT_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsAbortTimedOutTransactionsIntervalMsDoc) - .define(TransactionsRemoveExpiredTransactionalIdCleanupIntervalMsProp, INT, Defaults.TRANSACTIONS_REMOVE_EXPIRED_CLEANUP_INTERVAL_MS, atLeast(1), LOW, TransactionsRemoveExpiredTransactionsIntervalMsDoc) - - .define(TransactionPartitionVerificationEnableProp, BOOLEAN, Defaults.TRANSACTION_PARTITION_VERIFICATION_ENABLE, LOW, TransactionPartitionVerificationEnableDoc) - - .define(ProducerIdExpirationMsProp, INT, Defaults.PRODUCER_ID_EXPIRATION_MS, atLeast(1), LOW, ProducerIdExpirationMsDoc) + .define(TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, INT, Defaults.TRANSACTIONAL_ID_EXPIRATION_MS, atLeast(1), HIGH, TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_DOC) Review Comment: Should we do similar refactor for `Defaults`? -- This is an automated message from the Apache Git Service. To respond to the messa
Re: [PR] MINOR: fix javadoc warnings [kafka]
gaurav-narula commented on PR #15527: URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040616724 @gharris1727 Thanks for pointing that out. The warning below ``` kafka/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageMetrics.java:31: warning: reference not found: kafka.api.MetricsTest * @see kafka.api.MetricsTest ``` occurs because we're trying to refer to a class in the test compile path from production code. I don't think that's a good idea. Usually IDEs help in figuring out the usages of fields/classes anyway. Should we remove that tag? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix javadoc warnings [kafka]
gharris1727 commented on PR #15527: URL: https://github.com/apache/kafka/pull/15527#issuecomment-2040626161 > I don't think that's a good idea. Usually IDEs help in figuring out the usages of fields/classes anyway. Should we remove that tag? I agree, let's remove that. RemoteStorageMetrics is already referenced by MetricsTest, so this back-link isn't adding anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834451#comment-17834451 ] Matthias J. Sax commented on KAFKA-16478: - [~der_eismann] – Thanks for reporting this. Do you want to do a PR to fix it? Should be simple? Just need a small update to [https://github.com/apache/kafka-site/blob/asf-site/downloads.html] > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834452#comment-17834452 ] Matthias J. Sax commented on KAFKA-16478: - \cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move artifacts and forgot to update the links? > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478 ] Matthias J. Sax deleted comment on KAFKA-16478: - was (Author: mjsax): \cc [~omkreddy] [~stanislavkozlovski] as last RMs – did any of you move artifacts and forgot to update the links? > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16478) Links for Kafka 3.5.2 release are broken
[ https://issues.apache.org/jira/browse/KAFKA-16478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834456#comment-17834456 ] Matthias J. Sax commented on KAFKA-16478: - \cc [~mimaison] – seems you remove some older release from the download path (3.4.1, 3.5.0, 3.5.1, and 3.6.0). Let's check all links for these releases to point to "archive". > Links for Kafka 3.5.2 release are broken > > > Key: KAFKA-16478 > URL: https://issues.apache.org/jira/browse/KAFKA-16478 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 3.5.2 >Reporter: Philipp Trulson >Priority: Major > > While trying to update our setup, I noticed that the download links for the > 3.5.2 links are broken. They all point to a different host and also contain > an additional `/kafka` in their URL. Compare: > not working: > [https://downloads.apache.org/kafka/kafka/3.5.2/RELEASE_NOTES.html] > working: > [https://archive.apache.org/dist/kafka/3.5.1/RELEASE_NOTES.html] > [https://downloads.apache.org/kafka/3.6.2/RELEASE_NOTES.html] > This goes for all links in the release - archives, checksums, signatures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16474: -- Fix Version/s: 3.8.0 > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > Fix For: 3.8.0 > > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response
[ https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16474: -- Labels: kip-848-client-support (was: ) > AsyncKafkaConsumer might send out heartbeat request without waiting for its > response > > > Key: KAFKA-16474 > URL: https://issues.apache.org/jira/browse/KAFKA-16474 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Critical > Labels: kip-848-client-support > > KAFKA-16389 > We've discovered that in some uncommon cases, the consumer could send out > successive heartbeats without waiting for the response to come back. this > might result in causing the consumer to revoke its just assigned assignments > in some cases. For example: > > The consumer first sends out a heartbeat with epoch=0 and memberId='' > The consumer then rapidly sends out another heartbeat with epoch=0 and > memberId='' because it has not gotten any response and thus not updating its > local state > > The consumer receives assignments from the first heartbeat and reconciles its > assignment. > > Since the second heartbeat has epoch=0 and memberId='', the server will think > this is a new member joining and therefore send out an empty assignment. > > The consumer receives the response from the second heartbeat. Revoke all of > its partitions. > > There are 2 issues associate with this bug: > # inflight logic > # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to > be a few ms. -- This message was sent by Atlassian Jira (v8.20.10#820010)