[GitHub] [kafka] guozhangwang commented on pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions
guozhangwang commented on pull request #8934: URL: https://github.com/apache/kafka/pull/8934#issuecomment-652202875 @hachikuji Could you take a look at this one since it is a 2.6 blocker? Another alternative if we are less comfortable with the current exponential timeout fix-forward, is to just revert the behavior and try to complete the metadata update (including join-group) up to the poll timeout. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17149122#comment-17149122 ] Guozhang Wang commented on KAFKA-10134: --- I've updated the PR. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon edited a comment on pull request #8712: URL: https://github.com/apache/kafka/pull/8712#issuecomment-652177809 Thanks for the comments, @abbccdda . I've address your comments in this commit: https://github.com/apache/kafka/pull/8712/commits/4b57a606a3834323302b6d3d33ab95e5b88d183b What I did in this commit are: 1. simplifiy code 2. use the existing topic name in tests, instead of creating a new topic name: `leaderNotAvailableTopic` 3. use `numRetries` to indicate we are trying to go beyond the retry limit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on pull request #8712: URL: https://github.com/apache/kafka/pull/8712#issuecomment-652177809 Thanks for the comments, @abbccdda . I've address your comments in this commit: https://github.com/apache/kafka/pull/8712/commits/4b57a606a3834323302b6d3d33ab95e5b88d183b What I did in this commit are: 1. simplifiy code 2. use the existing topic name in tests, instead of creating a new topic name: `leaderNotAvailableTopic` 3. use numRetries to indicate we are trying to go beyond the retry limit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448103331 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), -hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + -" org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") +hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + +"Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } +@Test +public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + +final KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); +topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); +topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); +final KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) +.once(); +// return empty set for 1st time +EasyMock.expect(admin.createTopics(Collections.emptySet())) +.andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) +.once(); +EasyMock.expect(admin.createTopics(Collections.singleton( +new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), +Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), +Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), +Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")) +.andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + +EasyMock.replay(admin); + +final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); +internalTopicConfig.setNumberOfPartitions(1); +final Map topicConfigMap = new HashMap<>(); +topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); +topicManager.makeReady(topicConfigMap); + +EasyMock.verify(admin); +} + +@Test +public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); +final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1, +Collections.singletonList(broker1), Collections.singletonList(broker1)); + +final KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl<>(); +topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl<>(); +topicDescriptionSuccessFuture.complete( +new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet()) +); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture))) +
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448100930 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), -hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + -" org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") +hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + +"Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } +@Test +public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + +final KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); +topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); +topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); +final KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) +.once(); +// return empty set for 1st time +EasyMock.expect(admin.createTopics(Collections.emptySet())) +.andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) +.once(); +EasyMock.expect(admin.createTopics(Collections.singleton( +new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), +Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), +Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), +Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")) +.andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + +EasyMock.replay(admin); + +final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); +internalTopicConfig.setNumberOfPartitions(1); +final Map topicConfigMap = new HashMap<>(); +topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); +topicManager.makeReady(topicConfigMap); + +EasyMock.verify(admin); +} + +@Test +public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); +final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1, +Collections.singletonList(broker1), Collections.singletonList(broker1)); + +final KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl<>(); +topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl<>(); +topicDescriptionSuccessFuture.complete( +new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet()) +); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture))) +
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448100994 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), -hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + -" org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") +hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + +"Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } +@Test +public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + +final KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); +topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); +topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); +final KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) +.once(); +// return empty set for 1st time +EasyMock.expect(admin.createTopics(Collections.emptySet())) +.andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) +.once(); +EasyMock.expect(admin.createTopics(Collections.singleton( +new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), +Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), +Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), +Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")) +.andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + +EasyMock.replay(admin); + +final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); +internalTopicConfig.setNumberOfPartitions(1); +final Map topicConfigMap = new HashMap<>(); +topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); +topicManager.makeReady(topicConfigMap); Review comment: good. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests
vvcephei commented on pull request #8938: URL: https://github.com/apache/kafka/pull/8938#issuecomment-652173502 I had a chat with @ableegoldman earlier today. As a general strategy, maybe what we can do is aim to repair the non-downgradability where it exists. For example, when we release 2.5.1, then it will become possible to downgrade (with suppression) from 2.6.0 to 2.5.1 even though it's not possible to downgrade from 2.6.0 to 2.5.0. There's a bit of a chicken-and-egg problem here, since we can't add the downgrade tests _yet_, but what I can do is to create the tickets to both fix the known downgrade issues and then add the relevant downgrade path to the test matrix. Similar to how I couldn't add an upgrade test from 2.1 to trunk, but I filed a ticket with a note that once the ticket is fixed, we should be able to add the missing test parameters. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448099598 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), -hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + -" org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") +hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + +"Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } +@Test +public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + +final KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); +topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); +topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); +final KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) +.once(); +// return empty set for 1st time +EasyMock.expect(admin.createTopics(Collections.emptySet())) +.andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) +.once(); +EasyMock.expect(admin.createTopics(Collections.singleton( +new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), +Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), +Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), +Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")) +.andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + +EasyMock.replay(admin); + +final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); +internalTopicConfig.setNumberOfPartitions(1); +final Map topicConfigMap = new HashMap<>(); +topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); +topicManager.makeReady(topicConfigMap); + +EasyMock.verify(admin); +} + +@Test +public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); +final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1, +Collections.singletonList(broker1), Collections.singletonList(broker1)); + +final KafkaFutureImpl topicDescriptionFailFuture = new KafkaFutureImpl<>(); +topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionSuccessFuture = new KafkaFutureImpl<>(); +topicDescriptionSuccessFuture.complete( +new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet()) +); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture))) +
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448099358 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ## @@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() { assertThat( appender.getMessages(), -hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" + -" org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") +hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" + +"Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.") ); } } +@Test +public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() { +final AdminClient admin = EasyMock.createNiceMock(AdminClient.class); +final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config)); + +final KafkaFutureImpl topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>(); +topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!")); +final KafkaFutureImpl topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>(); +topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!")); +final KafkaFutureImpl topicCreationFuture = new KafkaFutureImpl<>(); + topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class)); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture))) +.once(); +// return empty set for 1st time +EasyMock.expect(admin.createTopics(Collections.emptySet())) +.andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + + EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic))) +.andReturn(new MockDescribeTopicsResult( +Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture))) +.once(); +EasyMock.expect(admin.createTopics(Collections.singleton( +new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1)) + .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), +Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), +Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"), +Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1")) +.andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once(); + +EasyMock.replay(admin); + +final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap()); +internalTopicConfig.setNumberOfPartitions(1); +final Map topicConfigMap = new HashMap<>(); +topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig); +topicManager.makeReady(topicConfigMap); + +EasyMock.verify(admin); +} + +@Test +public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() { Review comment: Yes, they are different scenarios. You can check below diagram for reference. (red and green for different cases) ![image](https://user-images.githubusercontent.com/43372967/86200914-905e3800-bb90-11ea-8d2c-c5e9c7b71cda.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests
vvcephei commented on pull request #8938: URL: https://github.com/apache/kafka/pull/8938#issuecomment-652171893 Oof, looks like I missed "something": http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-30--001.1593562449--vvcephei--kafka-10173-upgrde-smoke-system-test--1123f6f2c/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands
vvcephei commented on pull request #8961: URL: https://github.com/apache/kafka/pull/8961#issuecomment-652171595 new run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4002/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands
vvcephei commented on pull request #8961: URL: https://github.com/apache/kafka/pull/8961#issuecomment-652168931 Ah, Thanks @kkonstantine , I see that now: ``` 17:42:20 worker5: + curl --retry-all-errors --retry 5 -s -L https://s3-us-west-2.amazonaws.com/kafka-packages/jdk-8u202-linux-x64.tar.gz -o /tmp/jdk-8u202-linux-x64.tar.gz 17:42:20 worker5: curl: option --retry-all-error 17:42:20 worker5: s: is unknown 17:42:20 worker5: curl: try 'curl --help' or 'c 17:42:20 worker5: url --manual' for more information ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r448095124 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -100,13 +100,19 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams final Set newlyCreatedTopics = new HashSet<>(); while (!topicsNotReady.isEmpty() && remainingRetries >= 0) { -topicsNotReady = validateTopics(topicsNotReady, topics); +final Set tempUnknownTopics = new HashSet<>(); Review comment: Well, this variable naming is actually suggested by @ableegoldman , and I also think the `tempUnknownTopics` is more descriptive. Is that OK for you? https://github.com/apache/kafka/pull/8712#discussion_r444555724 ![image](https://user-images.githubusercontent.com/43372967/86199833-b6cea400-bb8d-11ea-81b5-7ba5b6064709.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r448093846 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; +static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; private StateManagerUtil() {} static RecordConverter converterForStore(final StateStore store) { return isTimestamped(store) ? rawValueToTimestampedValue() : identity(); } +static boolean checkpointNeeded(final boolean enforceCheckpoint, +final Map oldOffsetSnapshot, +final Map newOffsetSnapshot) { +// we should always have the old snapshot post completing the register state stores; +// if it is null it means the registration is not done and hence we should not overwrite the checkpoint +if (oldOffsetSnapshot == null) +return false; + +// if the previous snapshot is empty while the current snapshot is not then we should always checkpoint; +// note if the task is stateless or stateful but no stores logged, the snapshot would also be empty +// and hence it's okay to not checkpoint +if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty()) +return true; + +// we can checkpoint if the the difference between the current and the previous snapshot is large enough +long totalOffsetDelta = 0L; +for (final Map.Entry entry : newOffsetSnapshot.entrySet()) { +totalOffsetDelta += Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue()); +} + +// when enforcing checkpoint is required, we should overwrite the checkpoint if it is different from the old one; +// otherwise, we only overwrite the checkpoint if it is largely different from the old one +return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT; Review comment: I'm a bit torn about this optimization to avoid double checkpointing, because on the other hand, even if we have not made any progress since loaded the checkpoint, we'd still need to make a checkpoint upon closing if we have never made one before -- and I use emptyMap as an indicator for that. Given that upon suspending we are now less likely to checkpoint, the chance that we would double checkpointing (when transiting to suspended, and when transiting to closed) is smaller, and hence I'm thinking maybe I'd just remove this optimization to make the logic a bit simpler. LMK WDYT. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -542,13 +530,22 @@ public void closeCleanAndRecycleState() { log.info("Closed clean and recycled state"); } -private void writeCheckpoint() { +/** + * The following exceptions maybe thrown from the state manager flushing call + * + * @throws TaskMigratedException recoverable error sending changelog records that would cause the task to be removed + * @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed + * or flushing state store get IO errors; such error should cause the thread to die + */ +@Override +protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { Review comment: I decided to extract out the update of the changelog offsets from actually writing the offsets since even if we do not want to write the file, we still need to update the offsets. The reason I did not yet remove the parameter from `checkpoint` is that global-task is still using it. I plan to remove it when consolidating the global task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang opened a new pull request #8964: URL: https://github.com/apache/kafka/pull/8964 1. Do not always flush state stores when committing (in post-commit), move `stateMgr.flush` into post-commit to live with checkpointing. Then in post-commit, we checkpoint when: 1.a. The state store's snapshot has progressed much further compared to the previous checkpoint. 2.a. When the task is being closed, we enforce a snapshot (but as a minor optimization, we would avoid checkpointing if it is exactly the same). 2. However, for cache / suppression buffer, we still need to flush them in pre-commit to make sure all records sent via producers; I'd have to introduce a bit hacky workaround into the CachingStateStore in stateMgr. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory
abbccdda commented on pull request #8962: URL: https://github.com/apache/kafka/pull/8962#issuecomment-652159635 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8963: KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest
abbccdda commented on pull request #8963: URL: https://github.com/apache/kafka/pull/8963#issuecomment-652159442 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory
abbccdda commented on pull request #8962: URL: https://github.com/apache/kafka/pull/8962#issuecomment-652159560 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8963: KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest
ableegoldman opened a new pull request #8963: URL: https://github.com/apache/kafka/pull/8963 The current failures we're seeing with this test are due to faulty assumptions that it makes and not any real bug in eos-beta (at least, from what I've seen so far). The test relies on tightly controlling the commits, which it does by setting the commit interval to MAX_VALUE and manually requesting commits on the context. In two phases, the test assumes that any pending data will be committed after a rebalance. But we actually take care to avoid unnecessary commits -- with eos-alpha, we only commit tasks that are revoked while in eos-beta we must commit all tasks if any are revoked, but only if the revoked tasks themselves need a commit. The failure we see occurs when we try to verify the committed data after a second client is started and the group rebalances. The already-running client has to give up two tasks to the newly started client, but those tasks may not need to be committed in which case none of the tasks would be. So we still have an open transaction on the partitions where we try to read committed data. We can use a punctuator to force a commit on the running client This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mmolimar commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors
mmolimar commented on a change in pull request #7496: URL: https://github.com/apache/kafka/pull/7496#discussion_r448045788 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -524,6 +524,26 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord msg) { +try { +return keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key()); +} catch (Exception e) { +log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}", +this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e); +throw e; +} +} + +private SchemaAndValue convertValue(ConsumerRecord msg) { +try { +return valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value()); +} catch (Exception e) { +log.error("{} Error converting message value in topic '{}' partition {} at offset {} and timestamp {}", +this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e); Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory
ableegoldman commented on a change in pull request #8962: URL: https://github.com/apache/kafka/pull/8962#discussion_r448044329 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -136,7 +143,59 @@ private boolean taskDirEmpty(final File taskDir) { !pathname.getName().equals(CHECKPOINT_FILE_NAME)); // if the task is stateless, storeDirs would be null -return storeDirs == null || storeDirs.length == 0; +if (storeDirs == null || storeDirs.length == 0) { +return true; +} + +final List baseSubDirectories = new LinkedList<>(); +for (final File file : storeDirs) { +if (file.isDirectory()) { +baseSubDirectories.add(file); +} else { +return false; +} +} + +for (final File dir : baseSubDirectories) { +final boolean isEmpty; +if (dir.getName().equals(ROCKSDB_DIRECTORY_NAME)) { +isEmpty = taskSubDirectoriesEmpty(dir, true); +} else { +isEmpty = taskSubDirectoriesEmpty(dir, false); +} +if (!isEmpty) { +return false; +} +} +return true; +} + +// BFS through the task directory to look for any files that are not more subdirectories +private boolean taskSubDirectoriesEmpty(final File baseDir, final boolean sstOnly) { +final Queue subDirectories = new LinkedList<>(); +subDirectories.offer(baseDir); + +final Set visited = new HashSet<>(); +while (!subDirectories.isEmpty()) { +final File dir = subDirectories.poll(); +if (!visited.contains(dir)) { +final File[] files = dir.listFiles(); +if (files == null) { +continue; +} +for (final File file : files) { +if (file.isDirectory()) { +subDirectories.offer(file); +} else if (sstOnly && file.getName().endsWith(ROCKSDB_SST_SUFFIX)) { Review comment: This is admittedly quite hacky, and of course does not solve the problem for custom state stores that might write some non-data files upon open. A "better" fix would probably be to write some sentinel value in the checkpoint ala `OFFSET_UNKNOWN`, so we do have an entry in there if the store was opened but does not yet have any data. But, I wanted to keep things simple (a very relative term here, I know) and low-risk before the 2.6 release. We can discuss better solutions once we're not at the doorstep of the release (and blocking the door, I might add) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors
rhauch commented on a change in pull request #7496: URL: https://github.com/apache/kafka/pull/7496#discussion_r448029128 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -524,6 +524,26 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord msg) { +try { +return keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key()); +} catch (Exception e) { +log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}", +this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e); Review comment: It actually would be helpful to include the exception's error message in this line, since the message alone might be bubbled up via the REST API. ```suggestion log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}: {}", this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(), e); ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -524,6 +524,26 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord msg) { +try { +return keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key()); +} catch (Exception e) { +log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}", +this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e); +throw e; +} +} + +private SchemaAndValue convertValue(ConsumerRecord msg) { +try { +return valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value()); +} catch (Exception e) { +log.error("{} Error converting message value in topic '{}' partition {} at offset {} and timestamp {}", +this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e); Review comment: It actually would be helpful to include the exception's error message in this line, since the message alone might be bubbled up via the REST API. ```suggestion log.error("{} Error converting message value in topic '{}' partition {} at offset {} and timestamp {}: {}", this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(), e); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8858: KAFKA-10153: Error Reporting in Connect Documentation
rhauch commented on a change in pull request #8858: URL: https://github.com/apache/kafka/pull/8858#discussion_r448023368 ## File path: docs/connect.html ## @@ -258,6 +258,48 @@ REST API GET /- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to. +Error Reporting in Connect + +Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable "dead letter queue" (DLQ) Kafka topic. + +To report errors within a connector's converter, transforms, or in specifically the sink connector itself to the log, set errors.log.enable=true in the connector configuration to log details of each error and problem record's topic, partition, and offset. For additional debugging purposes, set errors.log.include.messages=true to also log the problem record key, value, and headers to the log (note this may log sensitive information). + +To report errors within a connector's converter, transforms, or in specifically the sink connector itself to a dead letter queue topic, set errors.deadletterqueue.topic.name, and optionally errors.deadletterqueue.context.headers.enable=true. Review comment: ```suggestion To report errors within a connector's converter, transforms, or within the sink connector itself to a dead letter queue topic, set errors.deadletterqueue.topic.name, and optionally errors.deadletterqueue.context.headers.enable=true. ``` ## File path: docs/connect.html ## @@ -258,6 +258,48 @@ REST API GET /- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to. +Error Reporting in Connect + +Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable "dead letter queue" (DLQ) Kafka topic. + +To report errors within a connector's converter, transforms, or in specifically the sink connector itself to the log, set errors.log.enable=true in the connector configuration to log details of each error and problem record's topic, partition, and offset. For additional debugging purposes, set errors.log.include.messages=true to also log the problem record key, value, and headers to the log (note this may log sensitive information). + +To report errors within a connector's converter, transforms, or in specifically the sink connector itself to a dead letter queue topic, set errors.deadletterqueue.topic.name, and optionally errors.deadletterqueue.context.headers.enable=true. + +For example, below shows a configuration that will cause a connector will fail immediately upon an error or exception. Although it is not necessary to add extra configuration properties for this behavior, adding the following properties to a sink connector configuration would achieve this "fail fast" behavior: + + +# disable retries on failure +errors.retry.timeout=0 + +# do not log the error and their contexts +errors.log.enable=false + +# do not record errors in a dead letter queue topic +errors.deadletterqueue.topic.name= Review comment: ```suggestion # errors.deadletterqueue.topic.name= ``` ## File path: docs/connect.html ## @@ -258,6 +258,48 @@ REST API GET /- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to. +Error Reporting in Connect + +Kafka Connect provides
[GitHub] [kafka] vvcephei commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory
vvcephei commented on pull request #8962: URL: https://github.com/apache/kafka/pull/8962#issuecomment-652090962 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics
[ https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148994#comment-17148994 ] Sophie Blee-Goldman commented on KAFKA-10217: - [~mhmdchebbi] try rebasing – the PR that added the ProcessorContextUtils class was only just merged earlier today. > Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to > StreamsMetrics > --- > > Key: KAFKA-10217 > URL: https://issues.apache.org/jira/browse/KAFKA-10217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: kip-required, newbie > > StreamsMetricsImpl contains several convenience methods for safely > registering sensors at different levels of granularity. We added them as > internal methods because we weren't sure of their stability and utility. Now, > they've been in use for quite a while and seem to be stable. > We should move them up into the public API so that custom stores and > processor implementations can also benefit from their safety. > Implementing this feature should also allow us to drop the adaptor function: > `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl` > Note: this feature requires a KIP, but since the API is already > pre-determined, it should be uncontroversial. This improvement would be a > good opportunity for someone who wants to get an initial KIP under their belt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine removed a comment on pull request #8961: MINOR: add a retry system test curl commands
kkonstantine removed a comment on pull request #8961: URL: https://github.com/apache/kafka/pull/8961#issuecomment-652085184 @vvcephei `--retry-all-error` is not an option that the current curl version recognizes. The system tests fail early on that error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8961: MINOR: add a retry system test curl commands
kkonstantine commented on pull request #8961: URL: https://github.com/apache/kafka/pull/8961#issuecomment-652085184 @vvcephei `--retry-all-error` is not an option that the current curl version recognizes. The system tests fail early on that error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands
vvcephei commented on pull request #8961: URL: https://github.com/apache/kafka/pull/8961#issuecomment-652082007 System test meta run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4001/console This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands
vvcephei commented on pull request #8961: URL: https://github.com/apache/kafka/pull/8961#issuecomment-652081453 Recently, I have seen a few remote (aws-over-vagrant) system tests fail during setup due to this error: ``` worker15: + curl -O https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_2.11-1.1.1.tgz worker15: % Total% Received % Xferd Average worker15: Speed TimeTime Time worker15: Current worker15: Dload Upload Total Spent Left worker15: Speed 0 00 00 0 0 0 --:--:-- worker15: --:--:-- --:--:-- 0 0 00 00 0 0 0 --:--:-- 0:00:01 --:--:-- 0 0 00 00 0 0 0 --:--:-- 0:00:02 --:-- worker15: :-- 0 0 00 00 0 0 0 --:--:-- 0:00:03 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:04 --:--:- worker15: - 0 0 00 00 0 0 0 --:--:-- 0:00:05 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:06 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:07 --:--:-- 0 0 00 00 0 0 0 --:--:-- 0:00:08 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:09 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:10 --:--:-- 0 0 00 00 0 0 0 --:--:-- 0:00:11 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:12 --:--:-- worker15: 0 0 00 00 0 0 0 --:--:-- 0:00:13 --:--:-- 0 worker15: curl: (35) Unknown SSL protocol error worker15: in connection to s3-us worker15: -west-2.amazonaws.com:443 ``` The problem is transient, so it seems to be an issue with the SSL negotiation on the server side. Some reading has suggested it could also be due to networking issues when running inside of containers, or over VPNs, etc., etc. I didn't find anything obvious we could do about it, so it seems worthwhile to at least attempt to avoid failing the whole run over a transient non-problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory
ableegoldman opened a new pull request #8962: URL: https://github.com/apache/kafka/pull/8962 Two more edge cases I found producing extra TaskcorruptedException while playing around with the failing eos-beta upgrade test (sadly these are unrelated problems, as the test still fails with these fixes in place). 1. Need to write the checkpoint when recycling a standby: although we do preserve the changelog offsets when recycling a task, and should therefore write the offsets when the new task is itself closed, we do NOT write the checkpoint for uninitialized tasks. So if the new task is ultimately closed before it gets out of the CREATED state, the offsets will not be written and we can get a TaskCorruptedException 2. With the change in task locking to address some Windows-related nonsense (am I remembering that correctly?), we don't delete entire task directories but just clear the inner state. With EOS, during initialization we check if the state directory is non-empty and the checkpoint is missing, and throw a TaskCorrupted if so. But just opening a rocksdb store creates a `rocksdb` base dir in the task directory, so the `taskDirIsEmpty` check always fails and we always throw TaskCorrupted even if there's nothing there. We can fix 2. for rocksdb specifically, but this might still cause a headache for users of custom stores. Note that it's not a correctness issue, just an annoyance, so my take is that we should avoid large last-minute changes and just fix for rocksdb in 2.6. Then we can consider a more holistic fix going forward This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8961: MINOR: add a retry system test curl commands
vvcephei opened a new pull request #8961: URL: https://github.com/apache/kafka/pull/8961 Adds a retry to curl, using the built-in exponential backoff mechanism. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8859: MINOR: Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31
kkonstantine commented on pull request #8859: URL: https://github.com/apache/kafka/pull/8859#issuecomment-652079556 Same here. Thanks for checking! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc closed pull request #8544: [WIP]KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
d8tltanc closed pull request #8544: URL: https://github.com/apache/kafka/pull/8544 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics
[ https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148976#comment-17148976 ] mohamed chebbi commented on KAFKA-10217: Hi [~vvcephei] i can't find `org.apache.kafka.streams.processor.internals.ProcessorContextUtils` in the kafka source code. buy the way did you mean `org.apache.kafka.streams.processor.internals.ProcessorContextImpl` ? > Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to > StreamsMetrics > --- > > Key: KAFKA-10217 > URL: https://issues.apache.org/jira/browse/KAFKA-10217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Minor > Labels: kip-required, newbie > > StreamsMetricsImpl contains several convenience methods for safely > registering sensors at different levels of granularity. We added them as > internal methods because we weren't sure of their stability and utility. Now, > they've been in use for quite a while and seem to be stable. > We should move them up into the public API so that custom stores and > processor implementations can also benefit from their safety. > Implementing this feature should also allow us to drop the adaptor function: > `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl` > Note: this feature requires a KIP, but since the API is already > pre-determined, it should be uncontroversial. This improvement would be a > good opportunity for someone who wants to get an initial KIP under their belt. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10166) Excessive TaskCorruptedException seen in testing
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-10166: - Assignee: Sophie Blee-Goldman > Excessive TaskCorruptedException seen in testing > > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates, long-running test applications with injected network > "outages" seem to hit TaskCorruptedException more than expected. > Seen occasionally on the ALOS application (~20 times in two days in one case, > for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10166) Excessive TaskCorruptedException seen in testing
[ https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reopened KAFKA-10166: - Assignee: (was: Bruno Cadonna) Found two edge cases we missed earlier so I'm reopening this blocker; the fixes are minor so I'll have a PR ready shortly cc [~rhauch] > Excessive TaskCorruptedException seen in testing > > > Key: KAFKA-10166 > URL: https://issues.apache.org/jira/browse/KAFKA-10166 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > As the title indicates, long-running test applications with injected network > "outages" seem to hit TaskCorruptedException more than expected. > Seen occasionally on the ALOS application (~20 times in two days in one case, > for example), and very frequently with EOS (many times per day) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #8544: [WIP]KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
dajac commented on pull request #8544: URL: https://github.com/apache/kafka/pull/8544#issuecomment-652059065 @d8tltanc We can close this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10219) KStream API support for multiple cluster broker
[ https://issues.apache.org/jira/browse/KAFKA-10219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148954#comment-17148954 ] Sachin Kurle commented on KAFKA-10219: -- I have simple use-case where I read off source topic from cluster A and write to target topic on cluster B I am not sure on how transaction going across multiple cluster affects the exactly once.. even in case of single cluster my kstream app deployed on host server have to make network call to kafka cluster change log topic to get transaction details and update the offset topic. May be I am not fully aware of the internal working of kafka stateful transaction in case of Exactly-once > KStream API support for multiple cluster broker > --- > > Key: KAFKA-10219 > URL: https://issues.apache.org/jira/browse/KAFKA-10219 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sachin Kurle >Priority: Major > > we are trying to consume from cluster A broker from KStream api and produce > to cluster B broker.. we have configuration as boot strap server in consumer > and producer configuration but kstream api is picking randomly bootstrap > server cluster A or B -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji opened a new pull request #8960: KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration
hachikuji opened a new pull request #8960: URL: https://github.com/apache/kafka/pull/8960 Note this is a backport of (#7687) for 2.3. Existing producer state expiration uses timestamps from data records only and not from transaction markers. This can cause premature producer expiration when the coordinator times out a transaction because we drop the state from existing batches. This in turn can allow the coordinator epoch to revert to a previous value, which can lead to validation failures during log recovery. This patch fixes the problem by also leveraging the timestamp from transaction markers. We also change the validation logic so that coordinator epoch is verified only for new marker appends. When replicating from the leader and when recovering the log, we only log a warning if we notice that the coordinator epoch has gone backwards. This allows recovery from previous occurrences of this bug. Finally, this patch fixes one minor issue when loading producer state from the snapshot file. When the only record for a given producer is a control record, the "last offset" field will be set to -1 in the snapshot. We should check for this case when loading to be sure we recover the state consistently. Reviewers: Ismael Juma , Guozhang Wang ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #8431: MINOR: Rename description of flatMapValues transformation
abbccdda commented on pull request #8431: URL: https://github.com/apache/kafka/pull/8431#issuecomment-652033261 @maseiler Could you try to attach a screenshot of the fix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #6792: KAFKA 8311 : Augment the error message to let user change `default.api.timeout.ms`
abbccdda commented on pull request #6792: URL: https://github.com/apache/kafka/pull/6792#issuecomment-652030858 Closing the PR due to lack of interest and duplicate. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda closed pull request #6792: KAFKA 8311 : Augment the error message to let user change `default.api.timeout.ms`
abbccdda closed pull request #6792: URL: https://github.com/apache/kafka/pull/6792 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-4996) Fix findbugs multithreaded correctness warnings for streams
[ https://issues.apache.org/jira/browse/KAFKA-4996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leah Thomas resolved KAFKA-4996. Resolution: Fixed > Fix findbugs multithreaded correctness warnings for streams > --- > > Key: KAFKA-4996 > URL: https://issues.apache.org/jira/browse/KAFKA-4996 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Colin McCabe >Assignee: Leah Thomas >Priority: Major > Labels: newbie > > Fix findbugs multithreaded correctness warnings for streams > {code} > Multithreaded correctness Warnings > > > > > > > >Code Warning > > > >AT Sequence of calls to java.util.concurrent.ConcurrentHashMap may not > be atomic in > org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(long, > ProcessorContext) > >IS Inconsistent synchronization of > org.apache.kafka.streams.KafkaStreams.stateListener; locked 66% of time > > > >IS Inconsistent synchronization of > org.apache.kafka.streams.processor.internals.StreamThread.stateListener; > locked 66% of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.processor.TopologyBuilder.applicationId; locked 50% > of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.CachingKeyValueStore.context; locked > 66% of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.CachingWindowStore.cache; locked 60% > of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.CachingWindowStore.context; locked > 66% of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.CachingWindowStore.name; locked 60% > of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.CachingWindowStore.serdes; locked > 70% of time > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.RocksDBStore.db; locked 63% of time > > > >IS Inconsistent synchronization of > org.apache.kafka.streams.state.internals.RocksDBStore.serdes; locked 76% of > time > > > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
d8tltanc commented on pull request #8846: URL: https://github.com/apache/kafka/pull/8846#issuecomment-652013968 @skaundinya15 @abbccdda @ijuma The utility class GeometricProgression is renamed in #8683 and merged into trunk. Now, this patch is rebased and ready for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation
d8tltanc commented on a change in pull request #8846: URL: https://github.com/apache/kafka/pull/8846#discussion_r447942236 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java ## @@ -67,9 +67,14 @@ * retry.backoff.ms */ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; -private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to " + -"retry a failed request. This avoids repeatedly sending requests in a tight loop under " + -"some failure scenarios."; +private static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC; + +/** + * retry.backoff.max.ms + */ +// TODO: Add validator rules and force backoff_max_ms > backoff_ms if possible (I guess it's impossible) Review comment: Since now the utility class will provide a static backoff if `backoff_max_ms` > `backoff_ms`, we don't need the checker anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
junrao commented on a change in pull request #8479: URL: https://github.com/apache/kafka/pull/8479#discussion_r447927722 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -499,7 +500,15 @@ class Partition(val topicPartition: TopicPartition, addingReplicas = addingReplicas, removingReplicas = removingReplicas ) - createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) + try { +this.createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) Review comment: Do we need this? Ditto below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148854#comment-17148854 ] Guozhang Wang commented on KAFKA-10134: --- [~neowu0] Thanks for your comments. I will incorporate them in my PR and ping you and [~seanguo] to test out before merging. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10017: -- Priority: Blocker (was: Critical) > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: Matthias J. Sax >Priority: Blocker > Labels: flaky-test, unit-test > Fix For: 2.6.0 > > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
[ https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148846#comment-17148846 ] Guozhang Wang commented on KAFKA-10017: --- [~rhauch] I'm making it back as a blocker for 2.6 since we have seen it failed again after re-enabling it. > Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta > --- > > Key: KAFKA-10017 > URL: https://issues.apache.org/jira/browse/KAFKA-10017 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: Matthias J. Sax >Priority: Blocker > Labels: flaky-test, unit-test > Fix For: 2.6.0 > > > Creating a new ticket for this since the root cause is different than > https://issues.apache.org/jira/browse/KAFKA-9966 > With injectError = true: > h3. Stacktrace > java.lang.AssertionError: Did not receive all 20 records from topic > multiPartitionOutputTopic within 6 ms Expected: is a value equal to or > greater than <20> but: <15> was less than <20> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961) > at > org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10219) KStream API support for multiple cluster broker
[ https://issues.apache.org/jira/browse/KAFKA-10219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148818#comment-17148818 ] Boyang Chen commented on KAFKA-10219: - Thanks for the proposal, the use case you proposed is reasonable. However, we need to better clarify the feature we are going to introduce and the challenges we are facing, such as: 1. What does "multiple clusters" suggest? Do we support all input topics in cluster A and all output topics in cluster B, or a mixing of topics in random cluster A, B, C which needs to be automatically detected by Streams? 2. How do we allocate internal topics? Which cluster should the changelog/repartition topics go to, input topic cluster, or the output one? 3. How do we support Exactly-once? Right now the entire framework assumes a single cluster context. When switching to multiple cluster, we could no longer guarantee exactly-once because we may spam our transaction across multiple clusters, and we don't have a centralized coordinator to track the progress. > KStream API support for multiple cluster broker > --- > > Key: KAFKA-10219 > URL: https://issues.apache.org/jira/browse/KAFKA-10219 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sachin Kurle >Priority: Major > > we are trying to consume from cluster A broker from KStream api and produce > to cluster B broker.. we have configuration as boot strap server in consumer > and producer configuration but kstream api is picking randomly bootstrap > server cluster A or B -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801 ] Di Campo edited comment on KAFKA-4273 at 6/30/20, 4:12 PM: --- Any news or improved workaround? I am also interested on this. In my case, I think my preferred workaround should be the one explained [here|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage] was (Author: xmar): Any news or improved workaround? I am also interested on this. In my case, I think my preferred workaround should be the one explained [[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage] > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Priority: Major > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores
[ https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801 ] Di Campo commented on KAFKA-4273: - Any news or improved workaround? I am also interested on this. In my case, I think my preferred workaround should be the one explained [[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage] > Streams DSL - Add TTL / retention period support for intermediate topics and > state stores > - > > Key: KAFKA-4273 > URL: https://issues.apache.org/jira/browse/KAFKA-4273 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Davor Poldrugo >Priority: Major > > Hi! > I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state > as far as I know - it's not configurable. > In my use case my data has TTL / retnetion period. It's 48 hours. After that > - data can be discarded. > I join two topics: "messages" and "prices" using windowed inner join. > The two intermediate Kafka topics for this join are named: > * messages-prices-join-this-changelog > * messages-prices-join-other-changelog > Since these topics are created as compacted by Kafka Streams, and I don't > wan't to keep data forever, I have altered them to not use compaction. Right > now my RocksDB state stores grow indefinitely, and I don't have any options > to define TTL, or somehow periodically clean the older data. > A "hack" that I use to keep my disk usage low - I have schedulled a job to > periodically stop Kafka Streams instances - one at the time. This triggers a > rebalance, and partitions migrate to other instances. When the instance is > started again, there's another rebalance, and sometimes this instance starts > processing partitions that wasn't processing before the stop - which leads to > deletion of the RocksDB state store for those partitions > (state.cleanup.delay.ms). In the next rebalance the local store is recreated > with a restore consumer - which reads data from - as previously mentioned - a > non compacted topic. And this effectively leads to a "hacked TTL support" in > Kafka Streams DSL. > Questions: > * Do you think would be reasonable to add support in the DSL api to define > TTL for local store? > * Which opens another question - there are use cases which don't need the > intermediate topics to be created as "compact". Could also this be added to > the DSL api? Maybe only this could be added, and this flag should also be > used for the RocksDB TTL. Of course in this case another config would be > mandatory - the retention period or TTL for the intermediate topics and the > state stores. I saw there is a new cleanup.policy - compact_and_delete - > added with KAFKA-4015. > * Which also leads to another question, maybe some intermediate topics / > state stores need different TTL, so a it's not as simple as that. But after > KAFKA-3870, it will be easier. > RocksDB supports TTL: > * > https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166 > * https://github.com/facebook/rocksdb/wiki/Time-to-Live > * > https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java > A somehow similar issue: KAFKA-4212 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148774#comment-17148774 ] huxihx commented on KAFKA-10220: Well, from the broker perspective, you are right. Only trunk is affected. What I mean is we'll also hit NPE when using 2.6 clients talking to trunk broker. > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Assignee: Luke Chen >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148756#comment-17148756 ] Ismael Juma commented on KAFKA-10220: - >From the link you referenced, it seems like this only affects trunk, not 2.6. > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Assignee: Luke Chen >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148738#comment-17148738 ] huxihx commented on KAFKA-10220: [~ijuma] This should affect 2.6 as well since `configurationKeys` starts to be initialized in 2.7, due to the refinement introduced by [KAFKA-9432|https://issues.apache.org/jira/browse/KAFKA-9432]. Anyway, since `configurationKeys` is a nullable, an empty check should be added when processing the resources in AdminManager. > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Assignee: Luke Chen >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-10220: - Assignee: Luke Chen > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Assignee: Luke Chen >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on pull request #8957: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
omkreddy commented on pull request #8957: URL: https://github.com/apache/kafka/pull/8957#issuecomment-651811192 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #8958: MINOR: Update AlterConfigsOptions' Javadoc
omkreddy merged pull request #8958: URL: https://github.com/apache/kafka/pull/8958 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148680#comment-17148680 ] Ismael Juma commented on KAFKA-10220: - Does this affect 2.6 as well or just trunk? > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram opened a new pull request #8959: MINOR: Fix log entry in FetchSessionHandler to specify throttle correctly
rajinisivaram opened a new pull request #8959: URL: https://github.com/apache/kafka/pull/8959 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #8958: MINOR: Update AlterConfigsOptions' Javadoc
dajac opened a new pull request #8958: URL: https://github.com/apache/kafka/pull/8958 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] urbandan opened a new pull request #8957: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override
urbandan opened a new pull request #8957: URL: https://github.com/apache/kafka/pull/8957 Implements KIP-308 Changes: - Added kafka-get-offsets.sh script - Removed deprecated max-wait-ms and offsets arguments - Updated tool to query all topic-partitions by default - Updated topic argument to support patterns - Added topic-partitions argument to support a list of topic-partition patterns - Added exclude-internal-topics to support filtering internal topics Testing done: added new ducktape tests for the tool. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #8935: KAFKA-10189: reset event queue time histogram when queue is empty
omkreddy commented on a change in pull request #8935: URL: https://github.com/apache/kafka/pull/8935#discussion_r447662323 ## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ## @@ -69,14 +69,16 @@ class QueuedEvent(val event: ControllerEvent, class ControllerEventManager(controllerId: Int, processor: ControllerEventProcessor, time: Time, - rateAndTimeMetrics: Map[ControllerState, KafkaTimer]) extends KafkaMetricsGroup { + rateAndTimeMetrics: Map[ControllerState, KafkaTimer], + eventQueueTimeTimeoutMs: Long = 6) extends KafkaMetricsGroup { import ControllerEventManager._ @volatile private var _state: ControllerState = ControllerState.Idle private val putLock = new ReentrantLock() private val queue = new LinkedBlockingQueue[QueuedEvent] // Visible for test private[controller] val thread = new ControllerEventThread(ControllerEventThreadName) + val eventQueueTimeMetricTimeoutMs = eventQueueTimeTimeoutMs Review comment: We dont need new `eventQueueTimeMetricTimeoutMs` variable. we can use `eventQueueTimeTimeoutMs` directly ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy merged pull request #8956: MINOR[docs]: fix typo in ssl.client.auth requested to required.
omkreddy merged pull request #8956: URL: https://github.com/apache/kafka/pull/8956 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576 ] Edoardo Comar edited comment on KAFKA-10220 at 6/30/20, 11:51 AM: -- {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}} {{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} was (Author: ecomar): {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576 ] Edoardo Comar edited comment on KAFKA-10220 at 6/30/20, 11:51 AM: -- {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}} {{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 }} {{Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} {{Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 }} {{Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} was (Author: ecomar): {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}} {{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576 ] Edoardo Comar edited comment on KAFKA-10220 at 6/30/20, 11:50 AM: -- {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} was (Author: ecomar): {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0ecomar@edoibmmac2 ~/devel/ws-github/apache/kafka (trunk)$ }} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576 ] Edoardo Comar commented on KAFKA-10220: --- {{also note that if the describe comes from the current trunk, it succeeds:}} {{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0ecomar@edoibmmac2 ~/devel/ws-github/apache/kafka (trunk)$ }} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148575#comment-17148575 ] Edoardo Comar commented on KAFKA-10220: --- ok, a bit more steps to reproduce looks like the issue was linked to having created and described a topic with 2.5 {{#on current trunk}} {{$ git lg}} {{55b5b248c - (HEAD -> trunk, origin/trunk, origin/HEAD)}} {{#build}} {{$ ./gradlew clean}} {{$ ./gradlew assemble -PscalaVersion=2.12}} {{#run zookepeer and one broker}} {{$ export SCALA_VERSION=2.12}} {{$ bin/zookeeper-server-start.sh config/zookeeper.properties}} {{$ bin/kafka-server-start.sh config/server.properties}} {{#create topic with CLI}} {{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic1 --partitions 1 --replication-factor 1}} {{# works fine}} {{# describe topic with CLI}} {{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}} {{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}} {{#works fine}} {{in another terminal, use the kafka_2.13-2.5.0 binary distribution:}} {{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic25 --partitions 1 --replication-factor 1}} {{Created topic topic25.}} {{#now describe it (still with 2.5) ... boom}} {{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}} {{Error while executing topic command : org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.Error while executing topic command : org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request.[2020-06-30 12:39:48,581] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)}} {{#in the broker terminal, the exception is }} {{[2020-06-30 12:40:27,543] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)[2020-06-30 12:40:27,543] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)java.lang.NullPointerException at kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:359) at kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:357) at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:291) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:290) at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:284) at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at scala.collection.TraversableLike.filter(TraversableLike.scala:382) at scala.collection.TraversableLike.filter$(TraversableLike.scala:382) at scala.collection.AbstractTraversable.filter(Traversable.scala:108) at kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:357) at kafka.server.AdminManager.describeConfigs(AdminManager.scala:350) at kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2594) at kafka.server.KafkaApis.handle(KafkaApis.scala:165) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at java.base/java.lang.Thread.run(Thread.java:834)}} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1',
[GitHub] [kafka] dajac commented on pull request #8954: MINOR; Move quota integration tests to using the new quota API.
dajac commented on pull request #8954: URL: https://github.com/apache/kafka/pull/8954#issuecomment-651742771 @ijuma @rajinisivaram Could you review this one when you have time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on pull request #8753: KAFKA-10043:Some parameters will be overwritten which was configured …
itantiger commented on pull request #8753: URL: https://github.com/apache/kafka/pull/8753#issuecomment-651738671 @rhauch Can you take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8940: KAFKA-10181: AlterConfig/IncrementalAlterConfig should route to the controller for non validation calls
dajac commented on a change in pull request #8940: URL: https://github.com/apache/kafka/pull/8940#discussion_r447557312 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -354,13 +401,62 @@ class KafkaApisTest { .setValue("bar")) requestData.resources.add(alterResource) -val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData) - .build(requestHeader.apiVersion)) +val incrementalAlterConfigsRequest = new IncrementalAlterConfigsRequest.Builder(requestData) + .build(requestHeader.apiVersion) +val request = buildRequest(incrementalAlterConfigsRequest) createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request) +val response = readResponse(ApiKeys.INCREMENTAL_ALTER_CONFIGS, incrementalAlterConfigsRequest, capturedResponse) + .asInstanceOf[IncrementalAlterConfigsResponse] + +val responseMap = response.data.responses().asScala.map { resourceResponse => + resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode) +}.toMap +assertEquals(Map(resourceName -> Errors.NONE), responseMap) + verify(authorizer, adminManager) } + @Test + def testIncrementalAlterConfigsWithNonController(): Unit = { +val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + +val resourceName = "topic-1" +val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, Review comment: `INCREMENTAL_ALTER_CONFIGS` should be used here. ## File path: clients/src/main/resources/common/message/AlterConfigsRequest.json ## @@ -18,7 +18,9 @@ "type": "request", "name": "AlterConfigsRequest", // Version 1 is the same as version 0. - "validVersions": "0-1", + // + // Version 2 will always route to the controller for topic resources change. Review comment: * This is not entirely true, isn't it? Topic resource change goes to the controller if `shouldValidateOnly` is false and goes to list loaded node otherwise. We should also explain how the broker resource is handled starting from v2. * I suggest to add the KIP number here and in all other schemas. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2103,31 +2097,36 @@ void handleFailure(Throwable throwable) { @Override public AlterConfigsResult incrementalAlterConfigs(Map> configs, - final AlterConfigsOptions options) { + final AlterConfigsOptions options) { Review comment: The code in `incrementalAlterConfigs` is almost identical to the code in `alterConfigs`. I wonder if we could share more of the logic between the two. Have you tried already? ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -292,30 +290,77 @@ class KafkaApisTest { 1, true, true) ) +EasyMock.expect(controller.isActive).andReturn(true) + // Verify that authorize is only called once EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(expectedActions.asJava))) .andReturn(Seq(AuthorizationResult.ALLOWED).asJava) .once() -expectNoThrottling() +val capturedResponse = expectNoThrottling() val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false))) .andReturn(Map(configResource -> ApiError.NONE)) Review comment: This is not related to the PR but while we are here, could we replace `anyObject()` with the correct expected value? ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -2561,23 +2580,41 @@ class KafkaApis(val requestChannel: RequestChannel, }.toBuffer }.toMap -val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) => - resource.`type` match { -case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => - authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME) -case ConfigResource.Type.TOPIC => - authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name) -case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") +def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = { + def responseCallback(requestThrottleMs: Int): IncrementalAlterConfigsResponse = { +new IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs, results.asJava)) } + sendResponseMaybeThrottle(request, responseCallback) } -val authorizedResult = adminManager.incrementalAlterConfigs(authorizedResources,
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148564#comment-17148564 ] Edoardo Comar commented on KAFKA-10220: --- [~huxi_2b] I initially hit this when using the 2.5.0 cli (from the binary distribution) against trunk (running in eclipse) I was later able to reproduce using the cli built from trunk (./gradlew assemble -PscalaVersion=2.12) > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on pull request #8935: KAFKA-10189: reset event queue time histogram when queue is empty
omkreddy commented on pull request #8935: URL: https://github.com/apache/kafka/pull/8935#issuecomment-651735737 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-10220: -- Description: In current trunk code Describing a topic from the CLI can fail with an NPE in the broker on the line {{ resource.configurationKeys.asScala.forall(_.contains(configName))}} (configurationKeys is null) {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at scala.collection.immutable.List.map(List.scala:283)}}{{at kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at java.lang.Thread.run(Thread.java:748)}} was: In current trunk code Describing a topic from the CLI can fail with an NPE in the broker on the line {{ resource.configurationKeys.asScala.forall(_.contains(configName))}} (configurationKeys is null?) {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at scala.collection.immutable.List.map(List.scala:283)}}{{at kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at java.lang.Thread.run(Thread.java:748)}} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the
[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest
[ https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148556#comment-17148556 ] Chia-Ping Tsai commented on KAFKA-10191: {quote} about adding KafkaStreams#cleanUp() in the code, {quote} yep, this is initial fix to this issue. {quote} the other thing to do are done manually out of the code. {quote} Yep, resetting stream application should be manually executed. However, as [~ableegoldman] suggested, we can do more for this fix so streams_optimized_test.py, now, always reset the stream application before running optimization. > fix flaky StreamsOptimizedTest > -- > > Key: KAFKA-10191 > URL: https://issues.apache.org/jira/browse/KAFKA-10191 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote}Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task > 2_0Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762) > at >
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148550#comment-17148550 ] huxihx commented on KAFKA-10220: [~ecomar] Thanks for reporting. I could not reproduce this issue with trunk. Are you using the latest code for clients? > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null?) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram merged pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
rajinisivaram merged pull request #8683: URL: https://github.com/apache/kafka/pull/8683 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch
rajinisivaram commented on pull request #8683: URL: https://github.com/apache/kafka/pull/8683#issuecomment-651726976 Streams test failures not related, merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test
omkreddy commented on pull request #8885: URL: https://github.com/apache/kafka/pull/8885#issuecomment-651724728 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-10220: -- Comment: was deleted (was: deserialized request data has a null configurationKeys {{DescribeConfigsRequestData(resources=[DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null),...}}) > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null?) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148535#comment-17148535 ] Edoardo Comar commented on KAFKA-10220: --- deserialized request data has a null configurationKeys {{DescribeConfigsRequestData(resources=[DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null),...}} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic can fail with an NPE in the broker > on the line > {{ > resource.configurationKeys.asScala.forall(_.contains(configName))}} > > (configurationKeys is null?) > {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error > processing describe configs request for resource > DescribeConfigsResource(resourceType=2, resourceName='topic1', > configurationKeys=null) > (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at > kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at > > scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at > scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at > scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at > scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at > scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at > scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at > scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at > scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at > scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at > scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at > scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at > scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at > kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at > > kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at > scala.collection.immutable.List.map(List.scala:283)}}{{at > kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at > kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at > kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at > java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10220) NPE when describing resources
[ https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edoardo Comar updated KAFKA-10220: -- Description: In current trunk code Describing a topic from the CLI can fail with an NPE in the broker on the line {{ resource.configurationKeys.asScala.forall(_.contains(configName))}} (configurationKeys is null?) {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at scala.collection.immutable.List.map(List.scala:283)}}{{at kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at java.lang.Thread.run(Thread.java:748)}} was: In current trunk code Describing a topic can fail with an NPE in the broker on the line {{ resource.configurationKeys.asScala.forall(_.contains(configName))}} (configurationKeys is null?) {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at scala.collection.immutable.List.map(List.scala:283)}}{{at kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at java.lang.Thread.run(Thread.java:748)}} > NPE when describing resources > - > > Key: KAFKA-10220 > URL: https://issues.apache.org/jira/browse/KAFKA-10220 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Edoardo Comar >Priority: Major > > In current trunk code > Describing a topic from the CLI can fail with an NPE in the broker > on the line > {{
[jira] [Updated] (KAFKA-10221) Backport fix for KAFKA-9603 to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna updated KAFKA-10221: -- Fix Version/s: 2.5.1 > Backport fix for KAFKA-9603 to 2.5 > --- > > Key: KAFKA-10221 > URL: https://issues.apache.org/jira/browse/KAFKA-10221 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Blocker > Fix For: 2.5.1 > > > The fix for [KAFKA-9603|https://issues.apache.org/jira/browse/KAFKA-9603] > shall be backported to 2.5. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10221) Backport fix for KAFKA-9603 to 2.5
Bruno Cadonna created KAFKA-10221: - Summary: Backport fix for KAFKA-9603 to 2.5 Key: KAFKA-10221 URL: https://issues.apache.org/jira/browse/KAFKA-10221 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.0 Reporter: Bruno Cadonna The fix for [KAFKA-9603|https://issues.apache.org/jira/browse/KAFKA-9603] shall be backported to 2.5. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10220) NPE when describing resources
Edoardo Comar created KAFKA-10220: - Summary: NPE when describing resources Key: KAFKA-10220 URL: https://issues.apache.org/jira/browse/KAFKA-10220 Project: Kafka Issue Type: Bug Components: core Reporter: Edoardo Comar In current trunk code Describing a topic can fail with an NPE in the broker on the line {{ resource.configurationKeys.asScala.forall(_.contains(configName))}} (configurationKeys is null?) {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing describe configs request for resource DescribeConfigsResource(resourceType=2, resourceName='topic1', configurationKeys=null) (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at scala.collection.immutable.List.map(List.scala:283)}}{{at kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo opened a new pull request #8956: MINOR[docs]: fix typo in ssl.client.auth requested to required.
jeqo opened a new pull request #8956: URL: https://github.com/apache/kafka/pull/8956 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8914: MINOR: Do not swallow exception when collecting PIDs
cadonna commented on pull request #8914: URL: https://github.com/apache/kafka/pull/8914#issuecomment-651699622 The failed system test does not seem to be related to this PR but rather to https://issues.apache.org/jira/browse/KAFKA-10191. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r447567460 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java ## @@ -124,18 +141,19 @@ public void before() { Serdes.String() ); metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG); -expect(context.metrics()) -.andReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)).anyTimes(); -expect(context.taskId()).andReturn(taskId).anyTimes(); -expect(inner.name()).andReturn("metered").anyTimes(); +expect(context.applicationId()).andStubReturn(APPLICATION_ID); +expect(context.metrics()).andStubReturn(new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion)); +expect(context.taskId()).andStubReturn(taskId); + expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC); Review comment: I agree that EasyMock is not able to magically implement a complex interface contract. However, as I said earlier it would significantly increase the size of this PR to swap to `InternalMockProcessorContext` or one of its siblings right now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)
LMnet commented on pull request #8955: URL: https://github.com/apache/kafka/pull/8955#issuecomment-651694260 @vvcephei @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] LMnet commented on pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes
LMnet commented on pull request #8049: URL: https://github.com/apache/kafka/pull/8049#issuecomment-651690176 Closed in favor of a https://github.com/apache/kafka/pull/8955 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] LMnet commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes
LMnet commented on a change in pull request #8049: URL: https://github.com/apache/kafka/pull/8049#discussion_r447559838 ## File path: streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala ## @@ -30,12 +32,15 @@ object Serdes { implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long() implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray() implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = JSerdes.Bytes() + implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer() + implicit def shortSerde: Serde[Short] = JSerdes.Short().asInstanceOf[Serde[Short]] implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]] implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float() implicit def Double: Serde[Double] = JSerdes.Double().asInstanceOf[Serde[Double]] implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double() implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]] implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer() + implicit def uuidSerde: Serde[UUID] = JSerdes.UUID() Review comment: I decided to create a [new pull request](https://github.com/apache/kafka/pull/8955). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] LMnet closed pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes
LMnet closed pull request #8049: URL: https://github.com/apache/kafka/pull/8049 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] LMnet opened a new pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash
LMnet opened a new pull request #8955: URL: https://github.com/apache/kafka/pull/8955 Implementation of a solution for KIP-616. Wildcard import of the old `org.apache.kafka.streams.scala.Serdes` leads to a name clash because some of implicits has the same names as types from the scala's std lib. New `org.apache.kafka.streams.scala.serialization.Serdes` is the same as old `Serdes`, but without name clashes. The old one was marked as deprecated. Also, missing serdes for `UUID`, `ByteBuffer` and `Short` types are present in the new `Serdes`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on pull request #8859: MINOR: Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31
akatona84 commented on pull request #8859: URL: https://github.com/apache/kafka/pull/8859#issuecomment-651684469 I did not find anything for that version, thanks again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest
[ https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148475#comment-17148475 ] mohamed chebbi commented on KAFKA-10191: [~chia7712] sorry to be late, i read the reset tool documentation and if i understand it's about adding KafkaStreams#cleanUp() in the code, the other thing to do are done manually out of the code. > fix flaky StreamsOptimizedTest > -- > > Key: KAFKA-10191 > URL: https://issues.apache.org/jira/browse/KAFKA-10191 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote}Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task > 2_0Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549) > at >
[GitHub] [kafka] dajac opened a new pull request #8954: MINOR; Move quota integration tests to using the new quota API.
dajac opened a new pull request #8954: URL: https://github.com/apache/kafka/pull/8954 This PR refactors the various quota integration tests in the `kafka.api` package and migrates them to using the new quota API instead of writing the quota to ZK directly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration
mimaison commented on pull request #8921: URL: https://github.com/apache/kafka/pull/8921#issuecomment-651658489 @satishbellapu I'll try to take a look Thursday or Friday This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-651657082 The conflicts are pretty nasty. I started looking at it last week but did not have the time (nor the courage!) to finish. I plan to take another look this week This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org