[GitHub] [kafka] guozhangwang commented on pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-06-30 Thread GitBox


guozhangwang commented on pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#issuecomment-652202875


   @hachikuji Could you take a look at this one since it is a 2.6 blocker?
   
   Another alternative if we are less comfortable with the current exponential 
timeout fix-forward, is to just revert the behavior and try to complete the 
metadata update (including join-group) up to the poll timeout.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-30 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17149122#comment-17149122
 ] 

Guozhang Wang commented on KAFKA-10134:
---

I've updated the PR.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-652177809


   Thanks for the comments, @abbccdda . I've address your comments in this 
commit: 
https://github.com/apache/kafka/pull/8712/commits/4b57a606a3834323302b6d3d33ab95e5b88d183b
   
   What I did in this commit are:
   1. simplifiy code
   2. use the existing topic name in tests, instead of creating a new topic 
name: `leaderNotAvailableTopic`
   3. use `numRetries` to indicate we are trying to go beyond the retry limit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-652177809


   Thanks for the comments, @abbccdda . I've address your comments in this 
commit: 
https://github.com/apache/kafka/pull/8712/commits/4b57a606a3834323302b6d3d33ab95e5b88d183b
   
   What I did in this commit are:
   1. simplifiy code
   2. use the existing topic name in tests, instead of creating a new topic 
name: `leaderNotAvailableTopic`
   3. use numRetries to indicate we are trying to go beyond the retry limit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448103331



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -287,12 +290,124 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
 assertThat(
 appender.getMessages(),
-hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-" 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+"Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
 );
 }
 }
 
+@Test
+public void 
shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+
+final KafkaFutureImpl 
topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl 
topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+topicDescriptionUnknownTopicFuture.completeExceptionally(new 
UnknownTopicOrPartitionException("Unknown Topic!"));
+final KafkaFutureImpl 
topicCreationFuture = new KafkaFutureImpl<>();
+
topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionLeaderNotAvailableFuture)))
+.once();
+// return empty set for 1st time
+EasyMock.expect(admin.createTopics(Collections.emptySet()))
+.andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).once();
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionUnknownTopicFuture)))
+.once();
+EasyMock.expect(admin.createTopics(Collections.singleton(
+new NewTopic(leaderNotAvailableTopic, Optional.of(1), 
Optional.of((short) 1))
+
.configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE),
+Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"CreateTime"),
+Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))
+.andReturn(new 
MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, 
topicCreationFuture))).once();
+
+EasyMock.replay(admin);
+
+final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+internalTopicConfig.setNumberOfPartitions(1);
+final Map topicConfigMap = new 
HashMap<>();
+topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+topicManager.makeReady(topicConfigMap);
+
+EasyMock.verify(admin);
+}
+
+@Test
+public void 
shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, 
broker1,
+Collections.singletonList(broker1), 
Collections.singletonList(broker1));
+
+final KafkaFutureImpl topicDescriptionFailFuture = 
new KafkaFutureImpl<>();
+topicDescriptionFailFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl topicDescriptionSuccessFuture 
= new KafkaFutureImpl<>();
+topicDescriptionSuccessFuture.complete(
+new TopicDescription(topic, false, 
Collections.singletonList(partitionInfo), Collections.emptySet())
+);
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionFailFuture)))
+

[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448100930



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -287,12 +290,124 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
 assertThat(
 appender.getMessages(),
-hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-" 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+"Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
 );
 }
 }
 
+@Test
+public void 
shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+
+final KafkaFutureImpl 
topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl 
topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+topicDescriptionUnknownTopicFuture.completeExceptionally(new 
UnknownTopicOrPartitionException("Unknown Topic!"));
+final KafkaFutureImpl 
topicCreationFuture = new KafkaFutureImpl<>();
+
topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionLeaderNotAvailableFuture)))
+.once();
+// return empty set for 1st time
+EasyMock.expect(admin.createTopics(Collections.emptySet()))
+.andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).once();
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionUnknownTopicFuture)))
+.once();
+EasyMock.expect(admin.createTopics(Collections.singleton(
+new NewTopic(leaderNotAvailableTopic, Optional.of(1), 
Optional.of((short) 1))
+
.configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE),
+Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"CreateTime"),
+Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))
+.andReturn(new 
MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, 
topicCreationFuture))).once();
+
+EasyMock.replay(admin);
+
+final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+internalTopicConfig.setNumberOfPartitions(1);
+final Map topicConfigMap = new 
HashMap<>();
+topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+topicManager.makeReady(topicConfigMap);
+
+EasyMock.verify(admin);
+}
+
+@Test
+public void 
shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, 
broker1,
+Collections.singletonList(broker1), 
Collections.singletonList(broker1));
+
+final KafkaFutureImpl topicDescriptionFailFuture = 
new KafkaFutureImpl<>();
+topicDescriptionFailFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl topicDescriptionSuccessFuture 
= new KafkaFutureImpl<>();
+topicDescriptionSuccessFuture.complete(
+new TopicDescription(topic, false, 
Collections.singletonList(partitionInfo), Collections.emptySet())
+);
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionFailFuture)))
+

[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448100994



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -287,12 +290,124 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
 assertThat(
 appender.getMessages(),
-hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-" 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+"Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
 );
 }
 }
 
+@Test
+public void 
shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+
+final KafkaFutureImpl 
topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl 
topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+topicDescriptionUnknownTopicFuture.completeExceptionally(new 
UnknownTopicOrPartitionException("Unknown Topic!"));
+final KafkaFutureImpl 
topicCreationFuture = new KafkaFutureImpl<>();
+
topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionLeaderNotAvailableFuture)))
+.once();
+// return empty set for 1st time
+EasyMock.expect(admin.createTopics(Collections.emptySet()))
+.andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).once();
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionUnknownTopicFuture)))
+.once();
+EasyMock.expect(admin.createTopics(Collections.singleton(
+new NewTopic(leaderNotAvailableTopic, Optional.of(1), 
Optional.of((short) 1))
+
.configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE),
+Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"CreateTime"),
+Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))
+.andReturn(new 
MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, 
topicCreationFuture))).once();
+
+EasyMock.replay(admin);
+
+final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+internalTopicConfig.setNumberOfPartitions(1);
+final Map topicConfigMap = new 
HashMap<>();
+topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+topicManager.makeReady(topicConfigMap);

Review comment:
   good. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

2020-06-30 Thread GitBox


vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652173502


   I had a chat with @ableegoldman earlier today.
   
   As a general strategy, maybe what we can do is aim to repair the 
non-downgradability where it exists. For example, when we release 2.5.1, then 
it will become possible to downgrade (with suppression) from 2.6.0 to 2.5.1 
even though it's not possible to downgrade from 2.6.0 to 2.5.0.
   
   There's a bit of a chicken-and-egg problem here, since we can't add the 
downgrade tests _yet_, but what I can do is to create the tickets to both fix 
the known downgrade issues and then add the relevant downgrade path to the test 
matrix. Similar to how I couldn't add an upgrade test from 2.1 to trunk, but I 
filed a ticket with a note that once the ticket is fixed, we should be able to 
add the missing test parameters.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448099598



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -287,12 +290,124 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
 assertThat(
 appender.getMessages(),
-hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-" 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+"Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
 );
 }
 }
 
+@Test
+public void 
shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+
+final KafkaFutureImpl 
topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl 
topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+topicDescriptionUnknownTopicFuture.completeExceptionally(new 
UnknownTopicOrPartitionException("Unknown Topic!"));
+final KafkaFutureImpl 
topicCreationFuture = new KafkaFutureImpl<>();
+
topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionLeaderNotAvailableFuture)))
+.once();
+// return empty set for 1st time
+EasyMock.expect(admin.createTopics(Collections.emptySet()))
+.andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).once();
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionUnknownTopicFuture)))
+.once();
+EasyMock.expect(admin.createTopics(Collections.singleton(
+new NewTopic(leaderNotAvailableTopic, Optional.of(1), 
Optional.of((short) 1))
+
.configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE),
+Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"CreateTime"),
+Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))
+.andReturn(new 
MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, 
topicCreationFuture))).once();
+
+EasyMock.replay(admin);
+
+final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+internalTopicConfig.setNumberOfPartitions(1);
+final Map topicConfigMap = new 
HashMap<>();
+topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+topicManager.makeReady(topicConfigMap);
+
+EasyMock.verify(admin);
+}
+
+@Test
+public void 
shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, 
broker1,
+Collections.singletonList(broker1), 
Collections.singletonList(broker1));
+
+final KafkaFutureImpl topicDescriptionFailFuture = 
new KafkaFutureImpl<>();
+topicDescriptionFailFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl topicDescriptionSuccessFuture 
= new KafkaFutureImpl<>();
+topicDescriptionSuccessFuture.complete(
+new TopicDescription(topic, false, 
Collections.singletonList(partitionInfo), Collections.emptySet())
+);
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionFailFuture)))
+

[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448099358



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -287,12 +290,124 @@ public void 
shouldLogWhenTopicNotFoundAndNotThrowException() {
 
 assertThat(
 appender.getMessages(),
-hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet:" +
-" 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
+hasItem("stream-thread [" + threadName + "] Topic 
internal-topic is unknown or not found, hence not existed yet.\n" +
+"Error message was: 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic 
internal-topic not found.")
 );
 }
 }
 
+@Test
+public void 
shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+final InternalTopicManager topicManager = new 
InternalTopicManager(admin, new StreamsConfig(config));
+
+final KafkaFutureImpl 
topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new 
LeaderNotAvailableException("Leader Not Available!"));
+final KafkaFutureImpl 
topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+topicDescriptionUnknownTopicFuture.completeExceptionally(new 
UnknownTopicOrPartitionException("Unknown Topic!"));
+final KafkaFutureImpl 
topicCreationFuture = new KafkaFutureImpl<>();
+
topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionLeaderNotAvailableFuture)))
+.once();
+// return empty set for 1st time
+EasyMock.expect(admin.createTopics(Collections.emptySet()))
+.andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).once();
+
+
EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+.andReturn(new MockDescribeTopicsResult(
+Collections.singletonMap(leaderNotAvailableTopic, 
topicDescriptionUnknownTopicFuture)))
+.once();
+EasyMock.expect(admin.createTopics(Collections.singleton(
+new NewTopic(leaderNotAvailableTopic, Optional.of(1), 
Optional.of((short) 1))
+
.configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE),
+Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"CreateTime"),
+Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))
+.andReturn(new 
MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, 
topicCreationFuture))).once();
+
+EasyMock.replay(admin);
+
+final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+internalTopicConfig.setNumberOfPartitions(1);
+final Map topicConfigMap = new 
HashMap<>();
+topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+topicManager.makeReady(topicConfigMap);
+
+EasyMock.verify(admin);
+}
+
+@Test
+public void 
shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {

Review comment:
   Yes, they are different scenarios. You can check below diagram for 
reference. (red and green for different cases)
   
   
![image](https://user-images.githubusercontent.com/43372967/86200914-905e3800-bb90-11ea-8d2c-c5e9c7b71cda.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8938: KAFKA-10173: Use SmokeTest for upgrade system tests

2020-06-30 Thread GitBox


vvcephei commented on pull request #8938:
URL: https://github.com/apache/kafka/pull/8938#issuecomment-652171893


   Oof, looks like I missed "something": 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-06-30--001.1593562449--vvcephei--kafka-10173-upgrde-smoke-system-test--1123f6f2c/report.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


vvcephei commented on pull request #8961:
URL: https://github.com/apache/kafka/pull/8961#issuecomment-652171595


   new run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4002/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


vvcephei commented on pull request #8961:
URL: https://github.com/apache/kafka/pull/8961#issuecomment-652168931


   Ah, Thanks @kkonstantine ,
   
   I see that now:
   ```
   17:42:20 worker5: + curl --retry-all-errors --retry 5 -s -L 
https://s3-us-west-2.amazonaws.com/kafka-packages/jdk-8u202-linux-x64.tar.gz -o 
/tmp/jdk-8u202-linux-x64.tar.gz
   17:42:20 worker5: curl: option --retry-all-error
   17:42:20 worker5: s: is unknown
   17:42:20 worker5: curl: try 'curl --help' or 'c
   17:42:20 worker5: url --manual' for more information
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-06-30 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448095124



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -100,13 +100,19 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 final Set newlyCreatedTopics = new HashSet<>();
 
 while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
-topicsNotReady = validateTopics(topicsNotReady, topics);
+final Set tempUnknownTopics = new HashSet<>();

Review comment:
   Well, this variable naming is actually suggested by @ableegoldman , and 
I also think the `tempUnknownTopics` is more descriptive. Is that OK for you? 
   
   https://github.com/apache/kafka/pull/8712#discussion_r444555724
   
   
![image](https://user-images.githubusercontent.com/43372967/86199833-b6cea400-bb8d-11ea-81b5-7ba5b6064709.png)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-06-30 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r448093846



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
 static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
 private StateManagerUtil() {}
 
 static RecordConverter converterForStore(final StateStore store) {
 return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
 }
 
+static boolean checkpointNeeded(final boolean enforceCheckpoint,
+final Map 
oldOffsetSnapshot,
+final Map 
newOffsetSnapshot) {
+// we should always have the old snapshot post completing the register 
state stores;
+// if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+if (oldOffsetSnapshot == null)
+return false;
+
+// if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+// note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+// and hence it's okay to not checkpoint
+if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+return true;
+
+// we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+long totalOffsetDelta = 0L;
+for (final Map.Entry entry : 
newOffsetSnapshot.entrySet()) {
+totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+}
+
+// when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+// otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
   I'm a bit torn about this optimization to avoid double checkpointing, 
because on the other hand, even if we have not made any progress since loaded 
the checkpoint, we'd still need to make a checkpoint upon closing if we have 
never made one before -- and I use emptyMap as an indicator for that.
   
   Given that upon suspending we are now less likely to checkpoint, the chance 
that we would double checkpointing (when transiting to suspended, and when 
transiting to closed) is smaller, and hence I'm thinking maybe I'd just remove 
this optimization to make the logic a bit simpler. LMK WDYT.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -542,13 +530,22 @@ public void closeCleanAndRecycleState() {
 log.info("Closed clean and recycled state");
 }
 
-private void writeCheckpoint() {
+/**
+ * The following exceptions maybe thrown from the state manager flushing 
call
+ *
+ * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+ * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
+ *  or flushing state store get IO errors; such 
error should cause the thread to die
+ */
+@Override
+protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {

Review comment:
   I decided to extract out the update of the changelog offsets from 
actually writing the offsets since even if we do not want to write the file, we 
still need to update the offsets.
   
   The reason I did not yet remove the parameter from `checkpoint` is that 
global-task is still using it. I plan to remove it when consolidating the 
global task.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-06-30 Thread GitBox


guozhangwang opened a new pull request #8964:
URL: https://github.com/apache/kafka/pull/8964


   1. Do not always flush state stores when committing (in post-commit), move 
`stateMgr.flush` into post-commit to live with checkpointing. Then in 
post-commit, we checkpoint when:
   
   1.a. The state store's snapshot has progressed much further compared to the 
previous checkpoint.
   2.a. When the task is being closed, we enforce a snapshot (but as a minor 
optimization, we would avoid checkpointing if it is exactly the same).
   
   2. However, for cache / suppression buffer, we still need to flush them in 
pre-commit to make sure all records sent via producers; I'd have to introduce a 
bit hacky workaround into the CachingStateStore in stateMgr.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-06-30 Thread GitBox


abbccdda commented on pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#issuecomment-652159635


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8963: KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest

2020-06-30 Thread GitBox


abbccdda commented on pull request #8963:
URL: https://github.com/apache/kafka/pull/8963#issuecomment-652159442


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-06-30 Thread GitBox


abbccdda commented on pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#issuecomment-652159560


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8963: KAFKA-10017: fix flaky EosBetaUpgradeIntegrationTest

2020-06-30 Thread GitBox


ableegoldman opened a new pull request #8963:
URL: https://github.com/apache/kafka/pull/8963


   The current failures we're seeing with this test are due to faulty 
assumptions that it makes and not any real bug in eos-beta (at least, from what 
I've seen so far).
   
   The test relies on tightly controlling the commits, which it does by setting 
the commit interval to MAX_VALUE and manually requesting commits on the 
context. In two phases, the test assumes that any pending data will be 
committed after a rebalance. But we actually take care to avoid unnecessary 
commits -- with eos-alpha, we only commit tasks that are revoked while in 
eos-beta we must commit all tasks if any are revoked, but only if the revoked 
tasks themselves need a commit.
   
   The failure we see occurs when we try to verify the committed data after a 
second client is started and the group rebalances. The already-running client 
has to give up two tasks to the newly started client, but those tasks may not 
need to be committed in which case none of the tasks would be. So we still have 
an open transaction on the partitions where we try to read committed data.
   
   We can use a punctuator to force a commit on the running client 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mmolimar commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors

2020-06-30 Thread GitBox


mmolimar commented on a change in pull request #7496:
URL: https://github.com/apache/kafka/pull/7496#discussion_r448045788



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -524,6 +524,26 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord msg) {
+try {
+return keyConverter.toConnectData(msg.topic(), msg.headers(), 
msg.key());
+} catch (Exception e) {
+log.error("{} Error converting message key in topic '{}' partition 
{} at offset {} and timestamp {}",
+this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e);
+throw e;
+}
+}
+
+private SchemaAndValue convertValue(ConsumerRecord msg) {
+try {
+return valueConverter.toConnectData(msg.topic(), msg.headers(), 
msg.value());
+} catch (Exception e) {
+log.error("{} Error converting message value in topic '{}' 
partition {} at offset {} and timestamp {}",
+this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e);

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-06-30 Thread GitBox


ableegoldman commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r448044329



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -136,7 +143,59 @@ private boolean taskDirEmpty(final File taskDir) {
 !pathname.getName().equals(CHECKPOINT_FILE_NAME));
 
 // if the task is stateless, storeDirs would be null
-return storeDirs == null || storeDirs.length == 0;
+if (storeDirs == null || storeDirs.length == 0) {
+return true;
+}
+
+final List baseSubDirectories = new LinkedList<>();
+for (final File file : storeDirs) {
+if (file.isDirectory()) {
+baseSubDirectories.add(file);
+} else {
+return false;
+}
+}
+
+for (final File dir : baseSubDirectories) {
+final boolean isEmpty;
+if (dir.getName().equals(ROCKSDB_DIRECTORY_NAME)) {
+isEmpty = taskSubDirectoriesEmpty(dir, true);
+} else {
+isEmpty =  taskSubDirectoriesEmpty(dir, false);
+}
+if (!isEmpty) {
+return false;
+}
+}
+return true;
+}
+
+// BFS through the task directory to look for any files that are not more 
subdirectories
+private boolean taskSubDirectoriesEmpty(final File baseDir, final boolean 
sstOnly) {
+final Queue subDirectories = new LinkedList<>();
+subDirectories.offer(baseDir);
+
+final Set visited = new HashSet<>();
+while (!subDirectories.isEmpty()) {
+final File dir = subDirectories.poll();
+if (!visited.contains(dir)) {
+final  File[] files = dir.listFiles();
+if (files == null) {
+continue;
+}
+for (final File file : files) {
+if (file.isDirectory()) {
+subDirectories.offer(file);
+} else if (sstOnly && 
file.getName().endsWith(ROCKSDB_SST_SUFFIX)) {

Review comment:
   This is admittedly quite hacky, and of course does not solve the problem 
for custom state stores that might write some non-data files upon open. A 
"better" fix would probably be to write some sentinel value in the checkpoint 
ala `OFFSET_UNKNOWN`, so we do have an entry in there if the store was opened 
but does not yet have any data.
   
   But, I wanted to keep things simple (a very relative term here, I know) and 
low-risk before the 2.6 release. We can discuss better solutions once we're not 
at the doorstep of the release (and blocking the door, I might add)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors

2020-06-30 Thread GitBox


rhauch commented on a change in pull request #7496:
URL: https://github.com/apache/kafka/pull/7496#discussion_r448029128



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -524,6 +524,26 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord msg) {
+try {
+return keyConverter.toConnectData(msg.topic(), msg.headers(), 
msg.key());
+} catch (Exception e) {
+log.error("{} Error converting message key in topic '{}' partition 
{} at offset {} and timestamp {}",
+this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e);

Review comment:
   It actually would be helpful to include the exception's error message in 
this line, since the message alone might be bubbled up via the REST API.
   ```suggestion
   log.error("{} Error converting message key in topic '{}' 
partition {} at offset {} and timestamp {}: {}",
   this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e.getMessage(), e);
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -524,6 +524,26 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord msg) {
+try {
+return keyConverter.toConnectData(msg.topic(), msg.headers(), 
msg.key());
+} catch (Exception e) {
+log.error("{} Error converting message key in topic '{}' partition 
{} at offset {} and timestamp {}",
+this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e);
+throw e;
+}
+}
+
+private SchemaAndValue convertValue(ConsumerRecord msg) {
+try {
+return valueConverter.toConnectData(msg.topic(), msg.headers(), 
msg.value());
+} catch (Exception e) {
+log.error("{} Error converting message value in topic '{}' 
partition {} at offset {} and timestamp {}",
+this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e);

Review comment:
   It actually would be helpful to include the exception's error message in 
this line, since the message alone might be bubbled up via the REST API.
   ```suggestion
   log.error("{} Error converting message value in topic '{}' 
partition {} at offset {} and timestamp {}: {}",
   this, msg.topic(), msg.partition(), msg.offset(), 
msg.timestamp(), e.getMessage(), e);
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8858: KAFKA-10153: Error Reporting in Connect Documentation

2020-06-30 Thread GitBox


rhauch commented on a change in pull request #8858:
URL: https://github.com/apache/kafka/pull/8858#discussion_r448023368



##
File path: docs/connect.html
##
@@ -258,6 +258,48 @@ REST 
API
 GET /- return basic information about the Kafka 
Connect cluster such as the version of the Connect worker that serves the REST 
request (including git commit ID of the source code) and the Kafka cluster ID 
that is connected to.
 
 
+Error 
Reporting in Connect
+
+Kafka Connect provides error reporting to handle errors encountered 
along various stages of processing. By default, any error encountered during 
conversion or within transformations will cause the connector to fail. Each 
connector configuration can also enable tolerating such errors by skipping 
them, optionally writing each error and the details of the failed operation and 
problematic record (with various levels of detail) to the Connect application 
log. These mechanisms also capture errors when a sink connector is processing 
the messages consumed from its Kafka topics, and all of the errors can be 
written to a configurable "dead letter queue" (DLQ) Kafka topic.
+
+To report errors within a connector's converter, transforms, or in 
specifically the sink connector itself to the log, set 
errors.log.enable=true in the connector configuration to log 
details of each error and problem record's topic, partition, and offset. For 
additional debugging purposes, set 
errors.log.include.messages=true to also log the problem record 
key, value, and headers to the log (note this may log sensitive information).
+
+To report errors within a connector's converter, transforms, or in 
specifically the sink connector itself to a dead letter queue topic, set 
errors.deadletterqueue.topic.name, and optionally  
errors.deadletterqueue.context.headers.enable=true.

Review comment:
   ```suggestion
   To report errors within a connector's converter, transforms, or 
within the sink connector itself to a dead letter queue topic, set 
errors.deadletterqueue.topic.name, and optionally  
errors.deadletterqueue.context.headers.enable=true.
   ```

##
File path: docs/connect.html
##
@@ -258,6 +258,48 @@ REST 
API
 GET /- return basic information about the Kafka 
Connect cluster such as the version of the Connect worker that serves the REST 
request (including git commit ID of the source code) and the Kafka cluster ID 
that is connected to.
 
 
+Error 
Reporting in Connect
+
+Kafka Connect provides error reporting to handle errors encountered 
along various stages of processing. By default, any error encountered during 
conversion or within transformations will cause the connector to fail. Each 
connector configuration can also enable tolerating such errors by skipping 
them, optionally writing each error and the details of the failed operation and 
problematic record (with various levels of detail) to the Connect application 
log. These mechanisms also capture errors when a sink connector is processing 
the messages consumed from its Kafka topics, and all of the errors can be 
written to a configurable "dead letter queue" (DLQ) Kafka topic.
+
+To report errors within a connector's converter, transforms, or in 
specifically the sink connector itself to the log, set 
errors.log.enable=true in the connector configuration to log 
details of each error and problem record's topic, partition, and offset. For 
additional debugging purposes, set 
errors.log.include.messages=true to also log the problem record 
key, value, and headers to the log (note this may log sensitive information).
+
+To report errors within a connector's converter, transforms, or in 
specifically the sink connector itself to a dead letter queue topic, set 
errors.deadletterqueue.topic.name, and optionally  
errors.deadletterqueue.context.headers.enable=true.
+
+For example, below shows a configuration that will cause a connector 
will fail immediately upon an error or exception. Although it is not necessary 
to add extra configuration properties for this behavior, adding the following 
properties to a sink connector configuration would achieve this "fail fast" 
behavior:
+
+
+# disable retries on failure
+errors.retry.timeout=0
+
+# do not log the error and their contexts
+errors.log.enable=false
+
+# do not record errors in a dead letter queue topic
+errors.deadletterqueue.topic.name=

Review comment:
   ```suggestion
   # errors.deadletterqueue.topic.name=
   ```

##
File path: docs/connect.html
##
@@ -258,6 +258,48 @@ REST 
API
 GET /- return basic information about the Kafka 
Connect cluster such as the version of the Connect worker that serves the REST 
request (including git commit ID of the source code) and the Kafka cluster ID 
that is connected to.
 
 
+Error 
Reporting in Connect
+
+Kafka Connect provides 

[GitHub] [kafka] vvcephei commented on pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-06-30 Thread GitBox


vvcephei commented on pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#issuecomment-652090962


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics

2020-06-30 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148994#comment-17148994
 ] 

Sophie Blee-Goldman commented on KAFKA-10217:
-

[~mhmdchebbi] try rebasing – the PR that added the ProcessorContextUtils class 
was only just merged earlier today.

> Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to 
> StreamsMetrics
> ---
>
> Key: KAFKA-10217
> URL: https://issues.apache.org/jira/browse/KAFKA-10217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: kip-required, newbie
>
> StreamsMetricsImpl contains several convenience methods for safely 
> registering sensors at different levels of granularity. We added them as 
> internal methods because we weren't sure of their stability and utility. Now, 
> they've been in use for quite a while and seem to be stable.
> We should move them up into the public API so that custom stores and 
> processor implementations can also benefit from their safety.
> Implementing this feature should also allow us to drop the adaptor function: 
> `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl`
> Note: this feature requires a KIP, but since the API is already 
> pre-determined, it should be uncontroversial. This improvement would be a 
> good opportunity for someone who wants to get an initial KIP under their belt.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine removed a comment on pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


kkonstantine removed a comment on pull request #8961:
URL: https://github.com/apache/kafka/pull/8961#issuecomment-652085184


   @vvcephei `--retry-all-error` is not an option that the current curl version 
recognizes. The system tests fail early on that error. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


kkonstantine commented on pull request #8961:
URL: https://github.com/apache/kafka/pull/8961#issuecomment-652085184


   @vvcephei `--retry-all-error` is not an option that the current curl version 
recognizes. The system tests fail early on that error. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


vvcephei commented on pull request #8961:
URL: https://github.com/apache/kafka/pull/8961#issuecomment-652082007


   System test meta run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4001/console



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


vvcephei commented on pull request #8961:
URL: https://github.com/apache/kafka/pull/8961#issuecomment-652081453


   Recently, I have seen a few remote (aws-over-vagrant) system tests fail 
during setup due to this error:
   
   ```
   worker15: + curl -O 
https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_2.11-1.1.1.tgz
   worker15:   % Total% Received % Xferd  Average
   worker15:  Speed   TimeTime Time  
   worker15: Current
   worker15:  Dload  Upload   Total   Spent 
   Left  
   worker15: Speed
 0 00 00 0  0  0 --:--:--
   worker15:  --:--:-- --:--:-- 0
 0 00 00 0  0  0 --:--:--  0:00:01 --:--:-- 0
 0 00 00 0  0  0 --:--:--  0:00:02 --:--
   worker15: :-- 0
 0 00 00 0  0  0 --:--:--  0:00:03 --:--:--   
   worker15:   0
 0 00 00 0  0  0 --:--:--  0:00:04 --:--:-
   worker15: - 0
 0 00 00 0  0  0 --:--:--  0:00:05 --:--:--   
   worker15:   0
 0 00 00 0  0  0 --:--:--  0:00:06 --:--:--
   worker15:  0
 0 00 00 0  0  0 --:--:--  0:00:07 --:--:-- 0
 0 00 00 0  0  0 --:--:--  0:00:08 --:--:--
   worker15:  0
 0 00 00 0  0  0 --:--:--  0:00:09 --:--:-- 
   worker15: 0
 0 00 00 0  0  0 --:--:--  0:00:10 --:--:-- 0
 0 00 00 0  0  0 --:--:--  0:00:11 --:--:-- 
   worker15: 0
 0 00 00 0  0  0 --:--:--  0:00:12 --:--:--   
   worker15:   0
 0 00 00 0  0  0 --:--:--  0:00:13 --:--:-- 0
   worker15: curl: (35) Unknown SSL protocol error 
   worker15: in connection to s3-us
   worker15: -west-2.amazonaws.com:443 
   ```
   
   The problem is transient, so it seems to be an issue with the SSL 
negotiation on the server side. Some reading has suggested it could also be due 
to networking issues when running inside of containers, or over VPNs, etc., etc.
   
   I didn't find anything obvious we could do about it, so it seems worthwhile 
to at least attempt to avoid failing the whole run over a transient non-problem.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-06-30 Thread GitBox


ableegoldman opened a new pull request #8962:
URL: https://github.com/apache/kafka/pull/8962


   Two more edge cases I found producing extra TaskcorruptedException while 
playing around with the failing eos-beta upgrade test (sadly these are 
unrelated problems, as the test still fails with these fixes in place).
   
   1. Need to write the checkpoint when recycling a standby: although we do 
preserve the changelog offsets when recycling a task, and should therefore 
write the offsets when the new task is itself closed, we do NOT write the 
checkpoint for uninitialized tasks. So if the new task is ultimately closed 
before it gets out of the CREATED state, the offsets will not be written and we 
can get a TaskCorruptedException
   2. With the change in task locking to address some Windows-related nonsense 
(am I remembering that correctly?), we don't delete entire task directories but 
just clear the inner state. With EOS, during initialization we check if the 
state directory is non-empty and the checkpoint is missing, and throw a 
TaskCorrupted if so. But just opening a rocksdb store creates a `rocksdb` base 
dir in the task directory, so the `taskDirIsEmpty` check always fails and we 
always throw TaskCorrupted even if there's nothing there. 
   
   We can fix 2. for rocksdb specifically, but this might still cause a 
headache for users of custom stores. Note that it's not a correctness issue, 
just an annoyance, so my take is that we should avoid large last-minute changes 
and just fix for rocksdb in 2.6. Then we can consider a more holistic fix going 
forward



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8961: MINOR: add a retry system test curl commands

2020-06-30 Thread GitBox


vvcephei opened a new pull request #8961:
URL: https://github.com/apache/kafka/pull/8961


   Adds a retry to curl, using the built-in exponential backoff mechanism.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8859: MINOR: Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31

2020-06-30 Thread GitBox


kkonstantine commented on pull request #8859:
URL: https://github.com/apache/kafka/pull/8859#issuecomment-652079556


   Same here. Thanks for checking!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc closed pull request #8544: [WIP]KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-30 Thread GitBox


d8tltanc closed pull request #8544:
URL: https://github.com/apache/kafka/pull/8544


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10217) Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to StreamsMetrics

2020-06-30 Thread mohamed chebbi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148976#comment-17148976
 ] 

mohamed chebbi commented on KAFKA-10217:


Hi [~vvcephei]

i can't find 
`org.apache.kafka.streams.processor.internals.ProcessorContextUtils` in the 
kafka source code.

buy the way did you mean 
`org.apache.kafka.streams.processor.internals.ProcessorContextImpl` ?

> Move nodeLevelSensor and storeLevelSensor methods from StreamsMetricsImpl to 
> StreamsMetrics
> ---
>
> Key: KAFKA-10217
> URL: https://issues.apache.org/jira/browse/KAFKA-10217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>  Labels: kip-required, newbie
>
> StreamsMetricsImpl contains several convenience methods for safely 
> registering sensors at different levels of granularity. We added them as 
> internal methods because we weren't sure of their stability and utility. Now, 
> they've been in use for quite a while and seem to be stable.
> We should move them up into the public API so that custom stores and 
> processor implementations can also benefit from their safety.
> Implementing this feature should also allow us to drop the adaptor function: 
> `org.apache.kafka.streams.processor.internals.ProcessorContextUtils#getMetricsImpl`
> Note: this feature requires a KIP, but since the API is already 
> pre-determined, it should be uncontroversial. This improvement would be a 
> good opportunity for someone who wants to get an initial KIP under their belt.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10166) Excessive TaskCorruptedException seen in testing

2020-06-30 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-10166:
-

Assignee: Sophie Blee-Goldman

> Excessive TaskCorruptedException seen in testing
> 
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates, long-running test applications with injected network 
> "outages" seem to hit TaskCorruptedException more than expected.
> Seen occasionally on the ALOS application (~20 times in two days in one case, 
> for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10166) Excessive TaskCorruptedException seen in testing

2020-06-30 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reopened KAFKA-10166:
-
  Assignee: (was: Bruno Cadonna)

Found two edge cases we missed earlier so I'm reopening this blocker; the fixes 
are minor so I'll have a PR ready shortly cc [~rhauch]

> Excessive TaskCorruptedException seen in testing
> 
>
> Key: KAFKA-10166
> URL: https://issues.apache.org/jira/browse/KAFKA-10166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As the title indicates, long-running test applications with injected network 
> "outages" seem to hit TaskCorruptedException more than expected.
> Seen occasionally on the ALOS application (~20 times in two days in one case, 
> for example), and very frequently with EOS (many times per day)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #8544: [WIP]KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-30 Thread GitBox


dajac commented on pull request #8544:
URL: https://github.com/apache/kafka/pull/8544#issuecomment-652059065


   @d8tltanc We can close this one.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10219) KStream API support for multiple cluster broker

2020-06-30 Thread Sachin Kurle (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148954#comment-17148954
 ] 

Sachin Kurle commented on KAFKA-10219:
--

I have simple use-case where I read off source topic from cluster A and write 
to target topic on cluster B

I am not sure on how transaction going across multiple cluster affects the 
exactly once.. even in case of single cluster my kstream app deployed on host 
server have to make network call to kafka cluster change log topic to get 
transaction details and update the offset topic. May be I am not fully aware of 
the internal working of kafka stateful transaction in case of Exactly-once

> KStream API support for multiple cluster broker
> ---
>
> Key: KAFKA-10219
> URL: https://issues.apache.org/jira/browse/KAFKA-10219
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sachin Kurle
>Priority: Major
>
> we are trying to consume from cluster A broker from KStream api and produce 
> to cluster B broker.. we have configuration as boot strap server in consumer 
> and producer configuration but kstream api is picking randomly bootstrap 
> server cluster A or B



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji opened a new pull request #8960: KAFKA-9144; Track timestamp from txn markers to prevent early producer expiration

2020-06-30 Thread GitBox


hachikuji opened a new pull request #8960:
URL: https://github.com/apache/kafka/pull/8960


   Note this is a backport of (#7687) for 2.3.
   
   Existing producer state expiration uses timestamps from data records only 
and not from transaction markers. This can cause premature producer expiration 
when the coordinator times out a transaction because we drop the state from 
existing batches. This in turn can allow the coordinator epoch to revert to a 
previous value, which can lead to validation failures during log recovery. This 
patch fixes the problem by also leveraging the timestamp from transaction 
markers.
   
   We also change the validation logic so that coordinator epoch is verified 
only for new marker appends. When replicating from the leader and when 
recovering the log, we only log a warning if we notice that the coordinator 
epoch has gone backwards. This allows recovery from previous occurrences of 
this bug.
   
   Finally, this patch fixes one minor issue when loading producer state from 
the snapshot file. When the only record for a given producer is a control 
record, the "last offset" field will be set to -1 in the snapshot. We should 
check for this case when loading to be sure we recover the state consistently.
   
   Reviewers: Ismael Juma , Guozhang Wang 

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8431: MINOR: Rename description of flatMapValues transformation

2020-06-30 Thread GitBox


abbccdda commented on pull request #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-652033261


   @maseiler Could you try to attach a screenshot of the fix?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #6792: KAFKA 8311 : Augment the error message to let user change `default.api.timeout.ms`

2020-06-30 Thread GitBox


abbccdda commented on pull request #6792:
URL: https://github.com/apache/kafka/pull/6792#issuecomment-652030858


   Closing the PR due to lack of interest and duplicate.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda closed pull request #6792: KAFKA 8311 : Augment the error message to let user change `default.api.timeout.ms`

2020-06-30 Thread GitBox


abbccdda closed pull request #6792:
URL: https://github.com/apache/kafka/pull/6792


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-4996) Fix findbugs multithreaded correctness warnings for streams

2020-06-30 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-4996.

Resolution: Fixed

> Fix findbugs multithreaded correctness warnings for streams
> ---
>
> Key: KAFKA-4996
> URL: https://issues.apache.org/jira/browse/KAFKA-4996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Colin McCabe
>Assignee: Leah Thomas
>Priority: Major
>  Labels: newbie
>
> Fix findbugs multithreaded correctness warnings for streams
> {code}
> Multithreaded correctness Warnings
>   
>   
> 
>   
>   
>   
> 
>Code Warning   
>   
>   
> 
>AT   Sequence of calls to java.util.concurrent.ConcurrentHashMap may not 
> be atomic in 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(long, 
> ProcessorContext) 
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.KafkaStreams.stateListener; locked 66% of time   
>   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.processor.internals.StreamThread.stateListener; 
> locked 66% of time
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.processor.TopologyBuilder.applicationId; locked 50% 
> of time   
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.context; locked 
> 66% of time   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.cache; locked 60% 
> of time   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.context; locked 
> 66% of time   
>   
>   
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.name; locked 60% 
> of time   
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.serdes; locked 
> 70% of time   
>   
>
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.RocksDBStore.db; locked 63% of time  
>   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.RocksDBStore.serdes; locked 76% of 
> time  
>   
>   
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-06-30 Thread GitBox


d8tltanc commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-652013968


   @skaundinya15 @abbccdda @ijuma 
   The utility class GeometricProgression is renamed in #8683 and merged into 
trunk. Now, this patch is rebased and ready for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-06-30 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r447942236



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -67,9 +67,14 @@
  * retry.backoff.ms
  */
 public static final String RETRY_BACKOFF_MS_CONFIG = 
CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-private static final String RETRY_BACKOFF_MS_DOC = "The amount of time to 
wait before attempting to " +
-"retry a failed request. This avoids repeatedly sending 
requests in a tight loop under " +
-"some failure scenarios.";
+private static final String RETRY_BACKOFF_MS_DOC = 
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
+
+/**
+ * retry.backoff.max.ms
+ */
+// TODO: Add validator rules and force backoff_max_ms > backoff_ms if 
possible (I guess it's impossible)

Review comment:
   Since now the utility class will provide a static backoff if 
`backoff_max_ms` > `backoff_ms`, we don't need the checker anymore.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-06-30 Thread GitBox


junrao commented on a change in pull request #8479:
URL: https://github.com/apache/kafka/pull/8479#discussion_r447927722



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -499,7 +500,15 @@ class Partition(val topicPartition: TopicPartition,
 addingReplicas = addingReplicas,
 removingReplicas = removingReplicas
   )
-  createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints)
+  try {
+this.createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints)

Review comment:
   Do we need this? Ditto below.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-06-30 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148854#comment-17148854
 ] 

Guozhang Wang commented on KAFKA-10134:
---

[~neowu0] Thanks for your comments. I will incorporate them in my PR and ping 
you and [~seanguo] to test out before merging.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-06-30 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10017:
--
Priority: Blocker  (was: Critical)

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.6.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-06-30 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148846#comment-17148846
 ] 

Guozhang Wang commented on KAFKA-10017:
---

[~rhauch] I'm making it back as a blocker for 2.6 since we have seen it failed 
again after re-enabling it.

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.6.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10219) KStream API support for multiple cluster broker

2020-06-30 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148818#comment-17148818
 ] 

Boyang Chen commented on KAFKA-10219:
-

Thanks for the proposal, the use case you proposed is reasonable. However, we 
need to better clarify the feature we are going to introduce and the challenges 
we are facing, such as:



1. What does "multiple clusters" suggest? Do we support all input topics in 
cluster A and all output topics in cluster B, or a mixing of topics in random 
cluster A, B, C which needs to be automatically detected by Streams?
2. How do we allocate internal topics? Which cluster should the 
changelog/repartition topics go to, input topic cluster, or the output one?
3. How do we support Exactly-once? Right now the entire framework assumes a 
single cluster context. When switching to multiple cluster, we could no longer 
guarantee exactly-once because we may spam our transaction across multiple 
clusters, and we don't have a centralized coordinator to track the progress.

> KStream API support for multiple cluster broker
> ---
>
> Key: KAFKA-10219
> URL: https://issues.apache.org/jira/browse/KAFKA-10219
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sachin Kurle
>Priority: Major
>
> we are trying to consume from cluster A broker from KStream api and produce 
> to cluster B broker.. we have configuration as boot strap server in consumer 
> and producer configuration but kstream api is picking randomly bootstrap 
> server cluster A or B



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-06-30 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801
 ] 

Di Campo edited comment on KAFKA-4273 at 6/30/20, 4:12 PM:
---

Any news or improved workaround? I am also interested on this. 
 In my case, I think my preferred workaround should be the one explained 
[here|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]


was (Author: xmar):
Any news or improved workaround? I am also interested on this. 
In my case, I think my preferred workaround should be the one explained 
[[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-06-30 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148801#comment-17148801
 ] 

Di Campo commented on KAFKA-4273:
-

Any news or improved workaround? I am also interested on this. 
In my case, I think my preferred workaround should be the one explained 
[[here|[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]]|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-HowtopurgedatafromKTablesbasedonage]

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148774#comment-17148774
 ] 

huxihx commented on KAFKA-10220:


Well, from the broker perspective, you are right. Only trunk is affected. What 
I mean is we'll also hit NPE when using 2.6 clients talking to trunk broker.

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Assignee: Luke Chen
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148756#comment-17148756
 ] 

Ismael Juma commented on KAFKA-10220:
-

>From the link you referenced, it seems like this only affects trunk, not 2.6.

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Assignee: Luke Chen
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148738#comment-17148738
 ] 

huxihx commented on KAFKA-10220:


[~ijuma] This should affect 2.6 as well since `configurationKeys` starts to be 
initialized in 2.7, due to the refinement introduced by 
[KAFKA-9432|https://issues.apache.org/jira/browse/KAFKA-9432]. Anyway, since 
`configurationKeys` is a nullable, an empty check should be added when 
processing the resources in AdminManager.

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Assignee: Luke Chen
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10220:
-

Assignee: Luke Chen

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Assignee: Luke Chen
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #8957: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2020-06-30 Thread GitBox


omkreddy commented on pull request #8957:
URL: https://github.com/apache/kafka/pull/8957#issuecomment-651811192


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy merged pull request #8958: MINOR: Update AlterConfigsOptions' Javadoc

2020-06-30 Thread GitBox


omkreddy merged pull request #8958:
URL: https://github.com/apache/kafka/pull/8958


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148680#comment-17148680
 ] 

Ismael Juma commented on KAFKA-10220:
-

Does this affect 2.6 as well or just trunk?

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram opened a new pull request #8959: MINOR: Fix log entry in FetchSessionHandler to specify throttle correctly

2020-06-30 Thread GitBox


rajinisivaram opened a new pull request #8959:
URL: https://github.com/apache/kafka/pull/8959


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8958: MINOR: Update AlterConfigsOptions' Javadoc

2020-06-30 Thread GitBox


dajac opened a new pull request #8958:
URL: https://github.com/apache/kafka/pull/8958


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] urbandan opened a new pull request #8957: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2020-06-30 Thread GitBox


urbandan opened a new pull request #8957:
URL: https://github.com/apache/kafka/pull/8957


   Implements KIP-308
   
   Changes:
   - Added kafka-get-offsets.sh script
   - Removed deprecated max-wait-ms and offsets arguments
   - Updated tool to query all topic-partitions by default
   - Updated topic argument to support patterns
   - Added topic-partitions argument to support a list of topic-partition 
patterns
   - Added exclude-internal-topics to support filtering internal topics
   
   Testing done: added new ducktape tests for the tool.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #8935: KAFKA-10189: reset event queue time histogram when queue is empty

2020-06-30 Thread GitBox


omkreddy commented on a change in pull request #8935:
URL: https://github.com/apache/kafka/pull/8935#discussion_r447662323



##
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##
@@ -69,14 +69,16 @@ class QueuedEvent(val event: ControllerEvent,
 class ControllerEventManager(controllerId: Int,
  processor: ControllerEventProcessor,
  time: Time,
- rateAndTimeMetrics: Map[ControllerState, 
KafkaTimer]) extends KafkaMetricsGroup {
+ rateAndTimeMetrics: Map[ControllerState, 
KafkaTimer],
+ eventQueueTimeTimeoutMs: Long = 6) extends 
KafkaMetricsGroup {
   import ControllerEventManager._
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
   private[controller] val thread = new 
ControllerEventThread(ControllerEventThreadName)
+  val eventQueueTimeMetricTimeoutMs = eventQueueTimeTimeoutMs

Review comment:
   We dont need new `eventQueueTimeMetricTimeoutMs` variable. we can use 
`eventQueueTimeTimeoutMs` directly ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy merged pull request #8956: MINOR[docs]: fix typo in ssl.client.auth requested to required.

2020-06-30 Thread GitBox


omkreddy merged pull request #8956:
URL: https://github.com/apache/kafka/pull/8956


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576
 ] 

Edoardo Comar edited comment on KAFKA-10220 at 6/30/20, 11:51 AM:
--

{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}}
{{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 
0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 
0}}


was (Author: ecomar):
{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 
--describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 
PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: 
topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 
1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 
Partition: 0 Leader: 0 Replicas: 0 Isr: 0}}

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576
 ] 

Edoardo Comar edited comment on KAFKA-10220 at 6/30/20, 11:51 AM:
--

{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}}
 {{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824 }}

{{Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}}

{{Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824 }}

{{Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 0}}


was (Author: ecomar):
{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}}
{{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 
0Topic: topic25 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824 Topic: topic25 Partition: 0 Leader: 0 Replicas: 0 Isr: 
0}}

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576
 ] 

Edoardo Comar edited comment on KAFKA-10220 at 6/30/20, 11:50 AM:
--

{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 
--describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 
PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: 
topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 
1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 
Partition: 0 Leader: 0 Replicas: 0 Isr: 0}}


was (Author: ecomar):
{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 
--describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 
PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: 
topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 
1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 
Partition: 0 Leader: 0 Replicas: 0 Isr: 0ecomar@edoibmmac2 
~/devel/ws-github/apache/kafka (trunk)$ }}

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148576#comment-17148576
 ] 

Edoardo Comar commented on KAFKA-10220:
---

{{also note that if the describe comes from the current trunk, it succeeds:}}

{{(trunk)$ bin/kafka-topics.sh --bootstrap-server localhost:9092 
--describe(trunk)$ bin/kafka-topics.sh --bootstrap-server Topic: topic1 
PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: 
topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0Topic: topic25 PartitionCount: 
1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: topic25 
Partition: 0 Leader: 0 Replicas: 0 Isr: 0ecomar@edoibmmac2 
~/devel/ws-github/apache/kafka (trunk)$ }}

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148575#comment-17148575
 ] 

Edoardo Comar commented on KAFKA-10220:
---

ok, a bit more steps to reproduce looks like the issue was linked to having 
created and described a topic with 2.5

{{#on current trunk}}
{{$ git lg}}
{{55b5b248c - (HEAD -> trunk, origin/trunk, origin/HEAD)}}

{{#build}}
{{$ ./gradlew clean}}
{{$ ./gradlew assemble -PscalaVersion=2.12}}

{{#run zookepeer and one broker}}
{{$ export SCALA_VERSION=2.12}}
{{$  bin/zookeeper-server-start.sh config/zookeeper.properties}}
{{$ bin/kafka-server-start.sh config/server.properties}}

{{#create topic with CLI}}
{{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
topic1 --partitions 1 --replication-factor 1}}
{{# works fine}}
{{# describe topic with CLI}}
{{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}}
{{Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824Topic: topic1 PartitionCount: 1 ReplicationFactor: 1 
Configs: segment.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 
Replicas: 0 Isr: 0}}
{{#works fine}}

{{in another terminal, use the kafka_2.13-2.5.0 binary distribution:}}
{{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
topic25 --partitions 1 --replication-factor 1}}
{{Created topic topic25.}}
{{#now describe it (still with 2.5) ... boom}}
{{$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe}}
{{Error while executing topic command : 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.Error while executing topic 
command : org.apache.kafka.common.errors.UnknownServerException: The server 
experienced an unexpected error when processing the request.[2020-06-30 
12:39:48,581] ERROR java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request. at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)}}

{{#in the broker terminal, the exception is }}

{{[2020-06-30 12:40:27,543] ERROR [Admin Manager on Broker 0]: Error processing 
describe configs request for resource DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null) 
(kafka.server.AdminManager)[2020-06-30 12:40:27,543] ERROR [Admin Manager on 
Broker 0]: Error processing describe configs request for resource 
DescribeConfigsResource(resourceType=2, resourceName='topic1', 
configurationKeys=null) 
(kafka.server.AdminManager)java.lang.NullPointerException at 
kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:359) at 
kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:357)
 at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:291)
 at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scala:943) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:290) at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:284) at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108) at 
scala.collection.TraversableLike.filter(TraversableLike.scala:382) at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:382) at 
scala.collection.AbstractTraversable.filter(Traversable.scala:108) at 
kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:357) at 
kafka.server.AdminManager.describeConfigs(AdminManager.scala:350) at 
kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2594) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:165) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) at 
java.base/java.lang.Thread.run(Thread.java:834)}}

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 

[GitHub] [kafka] dajac commented on pull request #8954: MINOR; Move quota integration tests to using the new quota API.

2020-06-30 Thread GitBox


dajac commented on pull request #8954:
URL: https://github.com/apache/kafka/pull/8954#issuecomment-651742771


   @ijuma @rajinisivaram Could you review this one when you have time?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on pull request #8753: KAFKA-10043:Some parameters will be overwritten which was configured …

2020-06-30 Thread GitBox


itantiger commented on pull request #8753:
URL: https://github.com/apache/kafka/pull/8753#issuecomment-651738671


   @rhauch  Can you take a look at this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8940: KAFKA-10181: AlterConfig/IncrementalAlterConfig should route to the controller for non validation calls

2020-06-30 Thread GitBox


dajac commented on a change in pull request #8940:
URL: https://github.com/apache/kafka/pull/8940#discussion_r447557312



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -354,13 +401,62 @@ class KafkaApisTest {
   .setValue("bar"))
 requestData.resources.add(alterResource)
 
-val request = buildRequest(new 
IncrementalAlterConfigsRequest.Builder(requestData)
-  .build(requestHeader.apiVersion))
+val incrementalAlterConfigsRequest = new 
IncrementalAlterConfigsRequest.Builder(requestData)
+  .build(requestHeader.apiVersion)
+val request = buildRequest(incrementalAlterConfigsRequest)
 createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
 
+val response = readResponse(ApiKeys.INCREMENTAL_ALTER_CONFIGS, 
incrementalAlterConfigsRequest, capturedResponse)
+  .asInstanceOf[IncrementalAlterConfigsResponse]
+
+val responseMap = response.data.responses().asScala.map { resourceResponse 
=>
+  resourceResponse.resourceName() -> 
Errors.forCode(resourceResponse.errorCode)
+}.toMap
+assertEquals(Map(resourceName -> Errors.NONE), responseMap)
+
 verify(authorizer, adminManager)
   }
 
+  @Test
+  def testIncrementalAlterConfigsWithNonController(): Unit = {
+val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+val resourceName = "topic-1"
+val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion,

Review comment:
   `INCREMENTAL_ALTER_CONFIGS` should be used here.

##
File path: clients/src/main/resources/common/message/AlterConfigsRequest.json
##
@@ -18,7 +18,9 @@
   "type": "request",
   "name": "AlterConfigsRequest",
   // Version 1 is the same as version 0.
-  "validVersions": "0-1",
+  //
+  // Version 2 will always route to the controller for topic resources change.

Review comment:
   * This is not entirely true, isn't it? Topic resource change goes to the 
controller if `shouldValidateOnly` is false and goes to list loaded node 
otherwise. We should also explain how the broker resource is handled starting 
from v2.
   * I suggest to add the KIP number here and in all other schemas.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2103,31 +2097,36 @@ void handleFailure(Throwable throwable) {
 
 @Override
 public AlterConfigsResult incrementalAlterConfigs(Map> configs,
- final 
AlterConfigsOptions options) {
+  final 
AlterConfigsOptions options) {

Review comment:
   The code in `incrementalAlterConfigs` is almost identical to the code in 
`alterConfigs`. I wonder if we could share more of the logic between the two. 
Have you tried already?

##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -292,30 +290,77 @@ class KafkaApisTest {
 1, true, true)
 )
 
+EasyMock.expect(controller.isActive).andReturn(true)
+
 // Verify that authorize is only called once
 EasyMock.expect(authorizer.authorize(anyObject[RequestContext], 
EasyMock.eq(expectedActions.asJava)))
   .andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
   .once()
 
-expectNoThrottling()
+val capturedResponse = expectNoThrottling()
 
 val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
 EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
   .andReturn(Map(configResource -> ApiError.NONE))

Review comment:
   This is not related to the PR but while we are here, could we replace 
`anyObject()` with the correct expected value?

##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -2561,23 +2580,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   }.toBuffer
 }.toMap
 
-val (authorizedResources, unauthorizedResources) = configs.partition { 
case (resource, _) =>
-  resource.`type` match {
-case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
-  authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
-case ConfigResource.Type.TOPIC =>
-  authorize(request.context, ALTER_CONFIGS, TOPIC, resource.name)
-case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
+def sendResponseCallback(results: Map[ConfigResource, ApiError]): Unit = {
+  def responseCallback(requestThrottleMs: Int): 
IncrementalAlterConfigsResponse = {
+new 
IncrementalAlterConfigsResponse(IncrementalAlterConfigsResponse.toResponseData(requestThrottleMs,
 results.asJava))
   }
+  sendResponseMaybeThrottle(request, responseCallback)
 }
 
-val authorizedResult = 
adminManager.incrementalAlterConfigs(authorizedResources, 

[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148564#comment-17148564
 ] 

Edoardo Comar commented on KAFKA-10220:
---

[~huxi_2b] I initially hit this when using the 2.5.0 cli (from the binary 
distribution) against trunk (running in eclipse)

I was later able to reproduce using the cli built from trunk (./gradlew 
assemble -PscalaVersion=2.12)

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #8935: KAFKA-10189: reset event queue time histogram when queue is empty

2020-06-30 Thread GitBox


omkreddy commented on pull request #8935:
URL: https://github.com/apache/kafka/pull/8935#issuecomment-651735737


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-10220:
--
Description: 
In current trunk code 
 Describing a topic from the CLI can fail with an NPE in the broker

on the line 

{{          resource.configurationKeys.asScala.forall(_.contains(configName))}}

 

(configurationKeys is null)

{{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing 
describe configs request for resource DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null) 
(kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
 
kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
 scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
 scala.collection.immutable.List.map(List.scala:283)}}{{at 
kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at 
kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
java.lang.Thread.run(Thread.java:748)}}

  was:
In current trunk code 
 Describing a topic from the CLI can fail with an NPE in the broker

on the line 

{{          resource.configurationKeys.asScala.forall(_.contains(configName))}}

 

(configurationKeys is null?)

{{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing 
describe configs request for resource DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null) 
(kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
 
kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
 scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
 scala.collection.immutable.List.map(List.scala:283)}}{{at 
kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at 
kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
java.lang.Thread.run(Thread.java:748)}}


> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the 

[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest

2020-06-30 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148556#comment-17148556
 ] 

Chia-Ping Tsai commented on KAFKA-10191:


{quote}
about adding KafkaStreams#cleanUp() in the code,
{quote}

yep, this is initial fix to this issue.

{quote}
the other thing to do are done manually out of the code.
{quote}

Yep, resetting stream application should be manually executed. However, as 
[~ableegoldman] suggested, we can do more for this fix so 
streams_optimized_test.py, now, always reset the stream application before 
running optimization.

> fix flaky StreamsOptimizedTest
> --
>
> Key: KAFKA-10191
> URL: https://issues.apache.org/jira/browse/KAFKA-10191
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 
> 2_0Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
>  at 
> 

[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148550#comment-17148550
 ] 

huxihx commented on KAFKA-10220:


[~ecomar]  Thanks for reporting. I could not reproduce this issue with trunk. 
Are you using the latest code for clients?

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null?)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-30 Thread GitBox


rajinisivaram merged pull request #8683:
URL: https://github.com/apache/kafka/pull/8683


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-06-30 Thread GitBox


rajinisivaram commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-651726976


   Streams test failures not related, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test

2020-06-30 Thread GitBox


omkreddy commented on pull request #8885:
URL: https://github.com/apache/kafka/pull/8885#issuecomment-651724728


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-10220:
--
Comment: was deleted

(was: deserialized request data has a null configurationKeys



{{DescribeConfigsRequestData(resources=[DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null),...}})

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null?)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148535#comment-17148535
 ] 

Edoardo Comar commented on KAFKA-10220:
---

deserialized request data has a null configurationKeys



{{DescribeConfigsRequestData(resources=[DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null),...}}

> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
> Describing a topic can fail with an NPE in the broker
> on the line 
> {{          
> resource.configurationKeys.asScala.forall(_.contains(configName))}}
>  
> (configurationKeys is null?)
> {{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error 
> processing describe configs request for resource 
> DescribeConfigsResource(resourceType=2, resourceName='topic1', 
> configurationKeys=null) 
> (kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
> kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
>  
> scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
>  scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
> scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
> scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
> scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
> scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
> scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
> scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
> scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
> kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at
>  
> kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
>  scala.collection.immutable.List.map(List.scala:283)}}{{at 
> kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
> kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at
>  kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
> java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-10220:
--
Description: 
In current trunk code 
 Describing a topic from the CLI can fail with an NPE in the broker

on the line 

{{          resource.configurationKeys.asScala.forall(_.contains(configName))}}

 

(configurationKeys is null?)

{{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing 
describe configs request for resource DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null) 
(kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
 
kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
 scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
 scala.collection.immutable.List.map(List.scala:283)}}{{at 
kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at 
kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
java.lang.Thread.run(Thread.java:748)}}

  was:
In current trunk code 
Describing a topic can fail with an NPE in the broker



on the line 

{{          resource.configurationKeys.asScala.forall(_.contains(configName))}}

 

(configurationKeys is null?)

{{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing 
describe configs request for resource DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null) 
(kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
 
kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
 scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
 scala.collection.immutable.List.map(List.scala:283)}}{{at 
kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at 
kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
java.lang.Thread.run(Thread.java:748)}}


> NPE when describing resources
> -
>
> Key: KAFKA-10220
> URL: https://issues.apache.org/jira/browse/KAFKA-10220
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Edoardo Comar
>Priority: Major
>
> In current trunk code 
>  Describing a topic from the CLI can fail with an NPE in the broker
> on the line 
> {{     

[jira] [Updated] (KAFKA-10221) Backport fix for KAFKA-9603 to 2.5

2020-06-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10221:
--
Fix Version/s: 2.5.1

> Backport fix for KAFKA-9603 to 2.5 
> ---
>
> Key: KAFKA-10221
> URL: https://issues.apache.org/jira/browse/KAFKA-10221
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.5.1
>
>
> The fix for [KAFKA-9603|https://issues.apache.org/jira/browse/KAFKA-9603] 
> shall be backported to 2.5. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10221) Backport fix for KAFKA-9603 to 2.5

2020-06-30 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10221:
-

 Summary: Backport fix for KAFKA-9603 to 2.5 
 Key: KAFKA-10221
 URL: https://issues.apache.org/jira/browse/KAFKA-10221
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: Bruno Cadonna


The fix for [KAFKA-9603|https://issues.apache.org/jira/browse/KAFKA-9603] shall 
be backported to 2.5. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10220) NPE when describing resources

2020-06-30 Thread Edoardo Comar (Jira)
Edoardo Comar created KAFKA-10220:
-

 Summary: NPE when describing resources
 Key: KAFKA-10220
 URL: https://issues.apache.org/jira/browse/KAFKA-10220
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Edoardo Comar


In current trunk code 
Describing a topic can fail with an NPE in the broker



on the line 

{{          resource.configurationKeys.asScala.forall(_.contains(configName))}}

 

(configurationKeys is null?)

{{[2020-06-30 11:10:39,464] ERROR [Admin Manager on Broker 0]: Error processing 
describe configs request for resource DescribeConfigsResource(resourceType=2, 
resourceName='topic1', configurationKeys=null) 
(kafka.server.AdminManager)}}{{java.lang.NullPointerException}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$3(AdminManager.scala:395)}}{{at
 
kafka.server.AdminManager.$anonfun$describeConfigs$3$adapted(AdminManager.scala:393)}}{{at
 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:248)}}{{at
 scala.collection.Iterator.foreach(Iterator.scala:929)}}{{at 
scala.collection.Iterator.foreach$(Iterator.scala:929)}}{{at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)}}{{at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)}}{{at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)}}{{at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)}}{{at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:247)}}{{at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:245)}}{{at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:104)}}{{at 
scala.collection.TraversableLike.filter(TraversableLike.scala:259)}}{{at 
scala.collection.TraversableLike.filter$(TraversableLike.scala:259)}}{{at 
scala.collection.AbstractTraversable.filter(Traversable.scala:104)}}{{at 
kafka.server.AdminManager.createResponseConfig$1(AdminManager.scala:393)}}{{at 
kafka.server.AdminManager.$anonfun$describeConfigs$1(AdminManager.scala:412)}}{{at
 scala.collection.immutable.List.map(List.scala:283)}}{{at 
kafka.server.AdminManager.describeConfigs(AdminManager.scala:386)}}{{at 
kafka.server.KafkaApis.handleDescribeConfigsRequest(KafkaApis.scala:2595)}}{{at 
kafka.server.KafkaApis.handle(KafkaApis.scala:165)}}{{at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)}}{{at 
java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo opened a new pull request #8956: MINOR[docs]: fix typo in ssl.client.auth requested to required.

2020-06-30 Thread GitBox


jeqo opened a new pull request #8956:
URL: https://github.com/apache/kafka/pull/8956


   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8914: MINOR: Do not swallow exception when collecting PIDs

2020-06-30 Thread GitBox


cadonna commented on pull request #8914:
URL: https://github.com/apache/kafka/pull/8914#issuecomment-651699622


   The failed system test does not seem to be related to this PR but rather to 
https://issues.apache.org/jira/browse/KAFKA-10191. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-06-30 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r447567460



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
##
@@ -124,18 +141,19 @@ public void before() {
 Serdes.String()
 );
 metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
-expect(context.metrics())
-.andReturn(new StreamsMetricsImpl(metrics, "test", 
builtInMetricsVersion)).anyTimes();
-expect(context.taskId()).andReturn(taskId).anyTimes();
-expect(inner.name()).andReturn("metered").anyTimes();
+expect(context.applicationId()).andStubReturn(APPLICATION_ID);
+expect(context.metrics()).andStubReturn(new 
StreamsMetricsImpl(metrics, "test", builtInMetricsVersion));
+expect(context.taskId()).andStubReturn(taskId);
+
expect(context.changelogFor(STORE_NAME)).andStubReturn(CHANGELOG_TOPIC);

Review comment:
   I agree that EasyMock is not able to magically implement a complex 
interface contract. However, as I said earlier it would significantly increase 
the size of this PR to swap to `InternalMockProcessorContext` or one of its 
siblings right now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LMnet commented on pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-06-30 Thread GitBox


LMnet commented on pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#issuecomment-651694260


   @vvcephei @mjsax



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LMnet commented on pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes

2020-06-30 Thread GitBox


LMnet commented on pull request #8049:
URL: https://github.com/apache/kafka/pull/8049#issuecomment-651690176


   Closed in favor of a https://github.com/apache/kafka/pull/8955



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LMnet commented on a change in pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes

2020-06-30 Thread GitBox


LMnet commented on a change in pull request #8049:
URL: https://github.com/apache/kafka/pull/8049#discussion_r447559838



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
##
@@ -30,12 +32,15 @@ object Serdes {
   implicit def JavaLong: Serde[java.lang.Long] = JSerdes.Long()
   implicit def ByteArray: Serde[Array[Byte]] = JSerdes.ByteArray()
   implicit def Bytes: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
   implicit def Float: Serde[Float] = JSerdes.Float().asInstanceOf[Serde[Float]]
   implicit def JavaFloat: Serde[java.lang.Float] = JSerdes.Float()
   implicit def Double: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
   implicit def JavaDouble: Serde[java.lang.Double] = JSerdes.Double()
   implicit def Integer: Serde[Int] = JSerdes.Integer().asInstanceOf[Serde[Int]]
   implicit def JavaInteger: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()

Review comment:
   I decided to create a [new pull 
request](https://github.com/apache/kafka/pull/8955).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LMnet closed pull request #8049: MINOR: Added missing default serdes to the streams.scala.Serdes

2020-06-30 Thread GitBox


LMnet closed pull request #8049:
URL: https://github.com/apache/kafka/pull/8049


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LMnet opened a new pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash

2020-06-30 Thread GitBox


LMnet opened a new pull request #8955:
URL: https://github.com/apache/kafka/pull/8955


   Implementation of a solution for KIP-616.
   
   Wildcard import of the old `org.apache.kafka.streams.scala.Serdes` leads
   to a name clash because some of implicits has the same names as types
   from the scala's std lib. New
   `org.apache.kafka.streams.scala.serialization.Serdes` is the same as old
   `Serdes`, but without name clashes. The old one was marked as deprecated.
   
   Also, missing serdes for `UUID`, `ByteBuffer` and `Short` types are present 
in
   the new `Serdes`.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on pull request #8859: MINOR: Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31

2020-06-30 Thread GitBox


akatona84 commented on pull request #8859:
URL: https://github.com/apache/kafka/pull/8859#issuecomment-651684469


   I did not find anything for that version, thanks again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10191) fix flaky StreamsOptimizedTest

2020-06-30 Thread mohamed chebbi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17148475#comment-17148475
 ] 

mohamed chebbi commented on KAFKA-10191:


[~chia7712] sorry to be late, i read the reset tool documentation and if i 
understand it's about adding KafkaStreams#cleanUp() in the code, the other 
thing to do are done manually out of the code.

> fix flaky StreamsOptimizedTest
> --
>
> Key: KAFKA-10191
> URL: https://issues.apache.org/jira/browse/KAFKA-10191
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 
> 2_0Exception in thread 
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" 
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at 
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
>  at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) 
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) 
> at java.util.TreeMap.put(TreeMap.java:552) at 
> java.util.TreeSet.add(TreeSet.java:255) at 
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at 
> java.util.TreeSet.addAll(TreeSet.java:312) at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
>  at 
> 

[GitHub] [kafka] dajac opened a new pull request #8954: MINOR; Move quota integration tests to using the new quota API.

2020-06-30 Thread GitBox


dajac opened a new pull request #8954:
URL: https://github.com/apache/kafka/pull/8954


   This PR refactors the various quota integration tests in the `kafka.api` 
package and migrates them to using the new quota API instead of writing the 
quota to ZK directly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-06-30 Thread GitBox


mimaison commented on pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#issuecomment-651658489


   @satishbellapu I'll try to take a look Thursday or Friday



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-06-30 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-651657082


   The conflicts are pretty nasty. I started looking at it last week but did 
not have the time (nor the courage!) to finish. I plan to take another look 
this week



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >