[jira] [Commented] (KAFKA-9220) TimeoutException when using kafka-preferred-replica-election

2020-09-28 Thread Or Shemesh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203648#comment-17203648
 ] 

Or Shemesh commented on KAFKA-9220:
---

[~akumar] Go for it and take it:)

Thanks!!!

> TimeoutException when using kafka-preferred-replica-election
> 
>
> Key: KAFKA-9220
> URL: https://issues.apache.org/jira/browse/KAFKA-9220
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Or Shemesh
>Priority: Major
>
> When running kafka-preferred-replica-election --bootstrap-server xxx:9092
> I'm getting this error:
> Timeout waiting for election resultsTimeout waiting for election 
> resultsException in thread "main" kafka.common.AdminCommandFailedException at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$AdminClientCommand.electPreferredLeaders(PreferredReplicaLeaderElectionCommand.scala:246)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.run(PreferredReplicaLeaderElectionCommand.scala:78)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:42)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)Caused
>  by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
>  
> Because we have a big cluster and getting all the data from the zookeeper is 
> taking more the 30 second.
>  
> After searching the code I saw that the 30 second is hard-coded can you 
> enable us to set the timeout as parameter?
> [https://github.com/confluentinc/kafka/blob/master/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 opened a new pull request #9349: MINOR: add proper checks to KafkaConsumer.groupMetadata

2020-09-28 Thread GitBox


chia7712 opened a new pull request #9349:
URL: https://github.com/apache/kafka/pull/9349


   add following checks to ```KafkaConsumer.groupMetadata```
   
   1. null check of coordinator (replace NPE by ```InvalidGroupIdException``` 
which is same to other methods)
   1. concurrent check (```groupMetadata``` is not thread-safe so concurrent 
check is necessary)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9348: KAFKA-10527; Voters should not reinitialize as leader in same epoch

2020-09-28 Thread GitBox


hachikuji opened a new pull request #9348:
URL: https://github.com/apache/kafka/pull/9348


   One of the invariants that the raft replication protocol attempts to ensure 
is that each record is uniquely identified by leader epoch and offset. This can 
be violated if a leader remains elected with the same epoch between restarts 
since unflushed data could be lost.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10513) Newly added topic or partitions are not assigned to running consumer groups using static membership

2020-09-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203622#comment-17203622
 ] 

Guozhang Wang commented on KAFKA-10513:
---

[~mou@ea] could you try setting the `metadata.max.age.ms` to a very small value 
and retry this scenario again? [~bchen225242] and I looked at both client and 
broker side code in 2.6 and we think it should still trigger rebalance.

> Newly added topic or partitions are not assigned to running consumer groups 
> using static membership
> ---
>
> Key: KAFKA-10513
> URL: https://issues.apache.org/jira/browse/KAFKA-10513
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.6.0
>Reporter: Marlon Ou
>Priority: Major
>
> If consumers are polling messages from a certain topic with static membership 
> and we add new partitions to this topic while the consumers are running, no 
> partition reassignment is ever triggered (and hence messages published into 
> the new partitions are never consumed). 
> To reproduce, simply set group instance IDs on the consumers: 
> {code:java}
> props.setProperty("group.instance.id", instanceId);
> {code}
> And then while the static consumers are running, use Kafka's admin client to 
> add more partitions to the topic:
> {code:java}
> adminClient.createPartitions(...)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10530) kafka-streams-application-reset misses some internal topics

2020-09-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203616#comment-17203616
 ] 

John Roesler commented on KAFKA-10530:
--

Thanks for the report [~oweiler] , and sorry for the trouble.

This report looks like https://issues.apache.org/jira/browse/KAFKA-9859, which 
should have been _fixed_ in 2.6.0. Can you confirm that you still see this 
issue in 2.6.0?

Thanks,

-John

> kafka-streams-application-reset misses some internal topics
> ---
>
> Key: KAFKA-10530
> URL: https://issues.apache.org/jira/browse/KAFKA-10530
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Oliver Weiler
>Priority: Major
>
> While the \{{kafka-streams-application-reset}} tool works in most cases, it 
> misses some internal topics when using {{Foreign Key Table-Table Joins}}.
> After execution, there are still two internal topics left which were not 
> deleted
> {code}
> bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic
> bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer 
> bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic
> {code}
> The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires 
> the internal topic to end with {{-changelog}} or {{-repartition}} (which the 
> mentioned topics don't).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`

2020-09-28 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203601#comment-17203601
 ] 

Chia-Ping Tsai commented on KAFKA-8266:
---

flaky again :(

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9284/6/testReport/kafka.api/ConsumerBounceTest/Build___JDK_11___testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/

> Improve 
> `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
> 
>
> Key: KAFKA-8266
> URL: https://issues.apache.org/jira/browse/KAFKA-8266
> Project: Kafka
>  Issue Type: Test
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
>
> Some additional validation could be done after the member gets kicked out. 
> The main thing is showing that the group can continue to consume data and 
> commit offsets.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r496340632



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not

Review comment:
   I had deleted the sentence about behavior compatibility :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-28 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496335557



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -201,26 +213,24 @@ public void close() {
 
 @Test
 public void testReplication() throws InterruptedException {
+String consumerGroupName = "consumer-group-testReplication";
+Map consumerProps  = new HashMap() {{
+put("group.id", consumerGroupName);
+put("auto.offset.reset", "latest");

Review comment:
   Oops, spoke too soon.
   Even though `latest` is Kafka's default, 
`EmbeddedKafkaCluster::createConsumer` [defaults it to 
`earliest`](https://github.com/apache/kafka/blob/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L444)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-28 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496326988



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -201,26 +213,24 @@ public void close() {
 
 @Test
 public void testReplication() throws InterruptedException {
+String consumerGroupName = "consumer-group-testReplication";
+Map consumerProps  = new HashMap() {{
+put("group.id", consumerGroupName);
+put("auto.offset.reset", "latest");

Review comment:
   Yep, unnecessary. I'll remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-28 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496093216



##
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##
@@ -55,8 +55,8 @@
   "about": "The maximum supported version for the feature." }
   ]
 },
-{"name": "FinalizedFeaturesEpoch", "type": "int32", "versions": "3+",
-  "tag": 1, "taggedVersions": "3+", "default": "-1",
+{"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",

Review comment:
   Space before "name".

##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"6",

Review comment:
   This is not included in the KIP. Should we update the KIP?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+private final short minVersionLevel;
+
+private final short maxVersionLevel;
+
+/**
+ * Raises an exception unless the following condition is met:
+ * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+ *
+ * @param minVersionLevel   The minimum version level value.
+ * @param maxVersionLevel   The maximum version level value.
+ *
+ * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+ */
+public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
   Since the user is not expected to instantiate this, should we make the 
constructor non-public?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result 

[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-09-28 Thread GitBox


abbccdda commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496321312



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
   Yea, you are right, I think this comment belongs to updateFeatures





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10511) Fix minor behavior difference in `MockLog`

2020-09-28 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10511.
-
Resolution: Fixed

> Fix minor behavior difference in `MockLog`
> --
>
> Key: KAFKA-10511
> URL: https://issues.apache.org/jira/browse/KAFKA-10511
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Fix minor difference in the implementation of the epoch cache in MockLog. In 
> `LeaderEpochFileCache`, we ensure new entries increase both start offset and 
> epoch monotonically. We also do not allow duplicates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-28 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306777



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -283,49 +296,140 @@ public void testReplication() throws 
InterruptedException {
 
 waitForCondition(() -> {
 try {
-return primaryClient.remoteConsumerOffsets("consumer-group-1", 
"backup",
+return primaryClient.remoteConsumerOffsets(consumerGroupName, 
"backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0));
 } catch (Throwable e) {
 return false;
 }
 }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary 
cluster.");
 
-Map primaryOffsets = 
primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+Map primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
 // Failback consumer group to primary cluster
-consumer2 = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
"consumer-group-1"));
-consumer2.assign(primaryOffsets.keySet());
-primaryOffsets.forEach(consumer2::seek);
-consumer2.poll(Duration.ofMillis(500));
-
-assertTrue("Consumer failedback to zero upstream offset.", 
consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback to zero downstream offset.", 
consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback beyond expected upstream offset.", 
consumer2.position(
-new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-assertTrue("Consumer failedback beyond expected downstream offset.", 
consumer2.position(
-new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED);
-
-consumer2.close();
-  
+primaryConsumer = primary.kafka().createConsumer(consumerProps);
+primaryConsumer.assign(allPartitions("test-topic-1", 
"backup.test-topic-1"));
+seek(primaryConsumer, primaryOffsets);
+consumeAllMessages(primaryConsumer, 0);
+
+assertTrue("Consumer failedback to zero upstream offset.", 
primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback to zero downstream offset.", 
primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback beyond expected upstream offset.", 
primaryConsumer.position(
+new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+assertTrue("Consumer failedback beyond expected downstream offset.", 
primaryConsumer.position(
+new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+
+Map>> messages2 = 
consumeAllMessages(primaryConsumer, 0);
+// If offset translation was successful we expect no messages to be 
consumed after failback
+assertEquals("Data was consumed from partitions: " + 
messages2.keySet() + ".", 0, messages2.size());
+primaryConsumer.close();
+
 // create more matching topics
 primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
 backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + 
i);
-backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+produceMessages(primary, "test-topic-2", "message-3-", 1);
+produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+assertEquals("Records were not produced to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+assertEquals("Records were not produced to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+assertEquals("New topic was not replicated to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+assertEquals("New topic was not replicated to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+}
+
+@Test
+public void testReplicationWithEmptyPartition() throws 
InterruptedException {

Review comment:
   Thanks, @mimaison . I've changed the 

[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-28 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306298



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -283,49 +296,140 @@ public void testReplication() throws 
InterruptedException {
 
 waitForCondition(() -> {
 try {
-return primaryClient.remoteConsumerOffsets("consumer-group-1", 
"backup",
+return primaryClient.remoteConsumerOffsets(consumerGroupName, 
"backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new 
TopicPartition("test-topic-1", 0));
 } catch (Throwable e) {
 return false;
 }
 }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary 
cluster.");
 
-Map primaryOffsets = 
primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+Map primaryOffsets = 
primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
 // Failback consumer group to primary cluster
-consumer2 = 
primary.kafka().createConsumer(Collections.singletonMap("group.id", 
"consumer-group-1"));
-consumer2.assign(primaryOffsets.keySet());
-primaryOffsets.forEach(consumer2::seek);
-consumer2.poll(Duration.ofMillis(500));
-
-assertTrue("Consumer failedback to zero upstream offset.", 
consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback to zero downstream offset.", 
consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-assertTrue("Consumer failedback beyond expected upstream offset.", 
consumer2.position(
-new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-assertTrue("Consumer failedback beyond expected downstream offset.", 
consumer2.position(
-new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PRODUCED);
-
-consumer2.close();
-  
+primaryConsumer = primary.kafka().createConsumer(consumerProps);
+primaryConsumer.assign(allPartitions("test-topic-1", 
"backup.test-topic-1"));
+seek(primaryConsumer, primaryOffsets);
+consumeAllMessages(primaryConsumer, 0);
+
+assertTrue("Consumer failedback to zero upstream offset.", 
primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback to zero downstream offset.", 
primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+assertTrue("Consumer failedback beyond expected upstream offset.", 
primaryConsumer.position(
+new TopicPartition("test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+assertTrue("Consumer failedback beyond expected downstream offset.", 
primaryConsumer.position(
+new TopicPartition("backup.test-topic-1", 0)) <= 
NUM_RECORDS_PER_PARTITION);
+
+Map>> messages2 = 
consumeAllMessages(primaryConsumer, 0);
+// If offset translation was successful we expect no messages to be 
consumed after failback
+assertEquals("Data was consumed from partitions: " + 
messages2.keySet() + ".", 0, messages2.size());
+primaryConsumer.close();
+
 // create more matching topics
 primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
 backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + 
i);
-backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+produceMessages(primary, "test-topic-2", "message-3-", 1);
+produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+assertEquals("Records were not produced to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+assertEquals("Records were not produced to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 
RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+assertEquals("New topic was not replicated to primary cluster.", 
NUM_RECORDS_PER_PARTITION,
+primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+assertEquals("New topic was not replicated to backup cluster.", 
NUM_RECORDS_PER_PARTITION,
+backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+}
+
+@Test
+public void testReplicationWithEmptyPartition() throws 
InterruptedException {
+String consumerGroupName = 

[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-09-28 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r496306398



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -136,10 +141,19 @@ public void setup() throws InterruptedException {
 backup.kafka().createTopic("primary.test-topic-1", 1);
 backup.kafka().createTopic("heartbeats", 1);
 
-for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-1-" + i);
-backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", 
"message-2-" + i);
-}
+// produce to all partitions of test-topic-1
+produceMessages(primary, "test-topic-1", "message-1-");
+produceMessages(backup, "test-topic-1", "message-2-");
+
+// Generate some consumer activity on both clusters to ensure the 
checkpoint connector always starts promptly
+Map dummyProps = new HashMap();

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8134: KAFKA-9546 Allow custom tasks through configuration

2020-09-28 Thread GitBox


rhauch commented on pull request #8134:
URL: https://github.com/apache/kafka/pull/8134#issuecomment-700351105


   As noted on https://issues.apache.org/jira/browse/KAFKA-9546, I'm not 
convinced this is useful change, since it would complicate these example 
connectors that ship with AK and would make customizing them very impractical 
from a build, release, and installation perspective.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams

2020-09-28 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203563#comment-17203563
 ] 

Randall Hauch commented on KAFKA-9546:
--

[~galyo], thanks for the suggestion and the PR.

I've added the `needs-kip` label, because the `FileStreamSourceConnector ` is 
part of the Connect API, even though it is intentionally just an example 
connector that helps demonstrate Connect. Because a KIP is required, I question 
whether changing this connector is really worth it. And because these file 
connectors are the only ones that ship with AK, extending them will undoubtably 
create issues if you're extension is installed into a different version of AK 
than the one with which it is compiled.

If you're providing a customized task class, could you not just provide your 
own `SourceConnector` class? You'd have a lot more control over, and you've 
have much more freedom to be able to deploy your connector into nearly any 
version of a Kafka Connect cluster installation. (The only limitation would be 
which of the Connect APIs you chose to use, such as the use of headers.)

As such, I think it's not worth the complication to the examples nor to your 
connector to make this change.

> Make FileStreamSourceTask extendable with generic streams
> -
>
> Key: KAFKA-9546
> URL: https://issues.apache.org/jira/browse/KAFKA-9546
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Csaba Galyo
>Assignee: Csaba Galyo
>Priority: Major
>  Labels: connect-api, needs-kip
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Use case: I want to read a ZIP compressed text file with a file connector and 
> send it to Kafka.
> Currently, we have FileStreamSourceConnector which reads a \n delimited text 
> file. This connector always returns a task of type FileStreamSourceTask.
> The FileStreamSourceTask reads from stdio or opens a file InputStream. The 
> issue with this approach is that the input needs to be a text file, otherwise 
> it won't work. 
> The code should be modified so that users could change the default 
> InputStream to eg. ZipInputStream, or any other format. The code is currently 
> written in such a way that it's not possible to extend it, we cannot use a 
> different input stream. 
> See example here where the code got copy-pasted just so it could read from a 
> ZstdInputStream (which reads ZSTD compressed files): 
> [https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file]
>  
> I suggest 2 changes:
>  # FileStreamSourceConnector should be extendable to return tasks of 
> different types. These types would be input by the user through the 
> configuration map
>  # FileStreamSourceTask should be modified so it could be extended and child 
> classes could define different input streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-28 Thread GitBox


hachikuji commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-700349997


   @nym3r0s Take a look at 
`SenderTest.testForceShutdownWithIncompleteTransaction`. I think we could add a 
similar test which aborts the transaction directly before the Produce request 
has been sent. Let us know if you don't have time within the next couple weeks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9332: KAFKA-10511; Ensure monotonic start epoch/offset updates in `MockLog`

2020-09-28 Thread GitBox


hachikuji merged pull request #9332:
URL: https://github.com/apache/kafka/pull/9332


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams

2020-09-28 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9546:
-
Labels: connect-api needs-kip  (was: connect-api)

> Make FileStreamSourceTask extendable with generic streams
> -
>
> Key: KAFKA-9546
> URL: https://issues.apache.org/jira/browse/KAFKA-9546
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Csaba Galyo
>Assignee: Csaba Galyo
>Priority: Major
>  Labels: connect-api, needs-kip
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> Use case: I want to read a ZIP compressed text file with a file connector and 
> send it to Kafka.
> Currently, we have FileStreamSourceConnector which reads a \n delimited text 
> file. This connector always returns a task of type FileStreamSourceTask.
> The FileStreamSourceTask reads from stdio or opens a file InputStream. The 
> issue with this approach is that the input needs to be a text file, otherwise 
> it won't work. 
> The code should be modified so that users could change the default 
> InputStream to eg. ZipInputStream, or any other format. The code is currently 
> written in such a way that it's not possible to extend it, we cannot use a 
> different input stream. 
> See example here where the code got copy-pasted just so it could read from a 
> ZstdInputStream (which reads ZSTD compressed files): 
> [https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file]
>  
> I suggest 2 changes:
>  # FileStreamSourceConnector should be extendable to return tasks of 
> different types. These types would be input by the user through the 
> configuration map
>  # FileStreamSourceTask should be modified so it could be extended and child 
> classes could define different input streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-09-28 Thread GitBox


rhauch commented on a change in pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#discussion_r496296182



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -260,7 +259,7 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
 // Do not revoke resources for re-assignment while a delayed rebalance 
is active
 // Also we do not revoke in two consecutive rebalances by the same 
leader
 canRevoke = delay == 0 && canRevoke;
-
+log.debug("Connector and task to revoke assgn post lb calculation: 
{}", toRevoke);

Review comment:
   Can you please make this log message more readable?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -577,15 +596,14 @@ private void resetDelay() {
 numToRevoke = floorTasks;
 for (WorkerLoad existing : existingWorkers) {
 Iterator tasks = existing.tasks().iterator();
+numToRevoke = existing.tasksSize() - ceilTasks;

Review comment:
   Isn't it possible that `numToRevoke` might be negative?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
 if (scheduledRebalance > 0 && now >= scheduledRebalance) {
 // delayed rebalance expired and it's time to assign resources
 log.debug("Delayed rebalance expired. Reassigning lost tasks");
-Optional candidateWorkerLoad = Optional.empty();
+List candidateWorkerLoad = Collections.emptyList();
 if (!candidateWorkersForReassignment.isEmpty()) {
 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
 }
 
-if (candidateWorkerLoad.isPresent()) {
-WorkerLoad workerLoad = candidateWorkerLoad.get();
-log.debug("A candidate worker has been found to assign lost 
tasks: {}", workerLoad.worker());
-lostAssignments.connectors().forEach(workerLoad::assign);
-lostAssignments.tasks().forEach(workerLoad::assign);
+if (!candidateWorkerLoad.isEmpty()) {
+log.debug("A list of candidate workers has been found to 
assign lost tasks: {}", 
candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(",")));
+Iterator candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+for (String connector : lostAssignments.connectors()) {
+// Loop over the the candidate workers as many times as it 
takes
+if (!candidateWorkerIterator.hasNext()) {
+candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+}
+WorkerLoad worker = candidateWorkerIterator.next();
+log.debug("Assigning connector id {} to member {}", 
connector, worker.worker());
+worker.assign(connector);
+log.debug("Assigned connector id {} to member {}", 
connector, worker.worker());
+}
+candidateWorkerIterator = candidateWorkerLoad.iterator();
+for (ConnectorTaskId task : lostAssignments.tasks()) {
+if (!candidateWorkerIterator.hasNext()) {
+candidateWorkerIterator = 
candidateWorkerLoad.iterator();
+}
+WorkerLoad worker = candidateWorkerIterator.next();
+log.debug("Assigning task id {} to member {}", task, 
worker.worker());
+worker.assign(task);
+log.debug("Assigned task id {} to member {}", task, 
worker.worker());

Review comment:
   ```suggestion
   ```
   Do we need both of these debug messages? After all, `worker.assign(...)` is 
just adding a `ConnectorTaskId` to a collection. How about keeping the first 
one since this is at this point an on-going process and we've not actually 
assigned anything to the actual worker node.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##
@@ -559,6 +576,8 @@ private void resetDelay() {
 log.debug("Previous rounded down (floor) average number of tasks per 
worker {}", totalActiveTasksNum / existingWorkersNum);
 int floorTasks = totalActiveTasksNum / totalWorkersNum;
 log.debug("New rounded down (floor) average number of tasks per worker 
{}", floorTasks);
+int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / 
totalWorkersNum);
+log.debug("New rounded down (ceil) average number of tasks per 

[GitHub] [kafka] rhauch commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

2020-09-28 Thread GitBox


rhauch commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r496239771



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
 this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 this.consumer = consumer;
 this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+this.taskStopped = false;

Review comment:
   Shouldn't this be volatile?
   
   Yes, it's true that `WorkerSinkTask.close()` is always and only called from 
within the `WorkerTask.doRun()` after the tasks determines it will stop. 
However, the `onPartitionsRevoked(...)` method is called from the consumer 
thread, and making the field volatile is the only way to ensure that the 
consumer thread reads a non-cached value.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #8973: KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once

2020-09-28 Thread GitBox


rhauch merged pull request #8973:
URL: https://github.com/apache/kafka/pull/8973


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


hachikuji commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r496287274



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not

Review comment:
   nit: can we take out the sentence about behavior compatibility. I think 
it is enough to say that the previous behavior was unintended.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

2020-09-28 Thread GitBox


C0urante commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r446490310



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty())
 
 @Override
 public void onPartitionsRevoked(Collection partitions) 
{
+if (taskStopped) {

Review comment:
   The callback gets invoked on the same thread as the one that 
`KafkaConsumer::close` is invoked on, so `volatile` isn't strictly necessary. 
If you (or others) think it'd be good to include just in case that changes or 
this callback gets invoked after the task is stopped on a different thread 
(which afaik is not possible atm), I don't have any major objections to adding 
it. Just didn't want to add it unnecessarily as it might be misleading to 
people reading the code base down the road. LMKWYT





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task

2020-09-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10532:
--
Description: Today whenever we are closing-dirty a task, we always wipe out 
the state stores if we are under EOS. But when the closing task was a RESTORING 
active, or a RUNNING standby, we may actually not need to wipe out the stores 
since we know that upon resuming, we would still continue restoring the task 
before transit to processing ever (assuming the LEO offset would not be 
truncated), i.e. when they resumes it does not matter if the same records gets 
applied twice during the continued restoration.  (was: Today whenever we are 
closing-dirty a task, we always wipe out the state stores if we are under EOS. 
But when the closing task was a RESTORING active, or a RUNNING standby, we may 
actually not need to wipe out the stores since we know that upon resuming, we 
would still restore the task before transit to processing (assuming the LEO 
offset would not be truncated), i.e. when they resumes it does not matter if 
the same records gets applied twice during the continued restoration.)

> Do not wipe state store under EOS when closing-dirty a RESTORING active or 
> RUNNING standby task
> ---
>
> Key: KAFKA-10532
> URL: https://issues.apache.org/jira/browse/KAFKA-10532
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today whenever we are closing-dirty a task, we always wipe out the state 
> stores if we are under EOS. But when the closing task was a RESTORING active, 
> or a RUNNING standby, we may actually not need to wipe out the stores since 
> we know that upon resuming, we would still continue restoring the task before 
> transit to processing ever (assuming the LEO offset would not be truncated), 
> i.e. when they resumes it does not matter if the same records gets applied 
> twice during the continued restoration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task

2020-09-28 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10532:
--
Summary: Do not wipe state store under EOS when closing-dirty a RESTORING 
active or RUNNING standby task  (was: Do not wipe state store under EOS when 
closing a RESTORING active or RUNNING standby task)

> Do not wipe state store under EOS when closing-dirty a RESTORING active or 
> RUNNING standby task
> ---
>
> Key: KAFKA-10532
> URL: https://issues.apache.org/jira/browse/KAFKA-10532
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today whenever we are closing-dirty a task, we always wipe out the state 
> stores if we are under EOS. But when the closing task was a RESTORING active, 
> or a RUNNING standby, we may actually not need to wipe out the stores since 
> we know that upon resuming, we would still restore the task before transit to 
> processing (assuming the LEO offset would not be truncated), i.e. when they 
> resumes it does not matter if the same records gets applied twice during the 
> continued restoration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10532) Do not wipe state store under EOS when closing a RESTORING active or RUNNING standby task

2020-09-28 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10532:
-

 Summary: Do not wipe state store under EOS when closing a 
RESTORING active or RUNNING standby task
 Key: KAFKA-10532
 URL: https://issues.apache.org/jira/browse/KAFKA-10532
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today whenever we are closing-dirty a task, we always wipe out the state stores 
if we are under EOS. But when the closing task was a RESTORING active, or a 
RUNNING standby, we may actually not need to wipe out the stores since we know 
that upon resuming, we would still restore the task before transit to 
processing (assuming the LEO offset would not be truncated), i.e. when they 
resumes it does not matter if the same records gets applied twice during the 
continued restoration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9346: KAFKA-8410: Split ProcessorContext into Processor/StateStore/RecordContext

2020-09-28 Thread GitBox


vvcephei commented on a change in pull request #9346:
URL: https://github.com/apache/kafka/pull/9346#discussion_r496227502



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -90,7 +90,7 @@ void register(final StateStore store,
  * @param name The store name
  * @return The state store instance
  */
-StateStore getStateStore(final String name);
+ S getStateStore(final String name);

Review comment:
   This is a backward-compatible change. It's necessary if we want to 
capitalize on the opportunity to implement all the new interfaces with a common 
ProcessorContextImpl (which I think we do).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {

Review comment:
   The new StateStoreContext. It contains only the parts of the 
ProcessorContext that should be appropriate within a StateStore.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+
+public class Record {
+private final K key;
+private final V value;
+private final long timestamp;
+private final Headers headers;
+
+public Record(final K key, final V value, final long timestamp, final 
Headers headers) {

Review comment:
   The new data object.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+public interface RecordMetadata {

Review comment:
   Metadata is an interface rather than a data class, since it'll never be 
constructed by users. Note that this also enables us to just re-use the 
ProcessorContextImpl as the implementation for this.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -140,76 +125,25 @@ Cancellable schedule(final Duration interval,
  final Punctuator callback);
 
 /**
- * Forwards a key/value 

[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496275381



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -133,11 +134,14 @@ public void start() {
 List partitions = new ArrayList<>();
 
 // We expect that the topics will have been created either manually by 
the user or automatically by the herder
-List partitionInfos = null;
-long started = time.milliseconds();
-while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+List partitionInfos = consumer.partitionsFor(topic);
+long started = time.nanoseconds();
+long maxSleepMs = 1_000;
+long sleepMs = 10;
+while (partitionInfos == null && time.nanoseconds() - started < 
CREATE_TOPIC_TIMEOUT_NS) {
+Utils.sleep(sleepMs);

Review comment:
   Updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop

2020-09-28 Thread GitBox


rhauch commented on a change in pull request #8910:
URL: https://github.com/apache/kafka/pull/8910#discussion_r496239771



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id,
 this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
 this.consumer = consumer;
 this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+this.taskStopped = false;

Review comment:
   Shouldn't this be volatile?
   
   Yes, it's true that `WorkerSinkTask.close()` is always and only called from 
within the `WorkerTask.doRun()` after the tasks determines it will stop. 
However, the `onPartitionsRevoked(...)` method is called from the consumer 
thread, and making this volatile is the only way to ensure that the consumer 
thread doesn't read a previously-cached value.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -315,6 +315,56 @@ public void testPause() throws Exception {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testShutdown() throws Exception {
+createTask(initialState);
+
+expectInitializeTask();
+expectTaskGetTopic(true);
+
+// first iteration
+expectPollInitialAssignment();
+
+// second iteration
+
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+sinkTask.put(EasyMock.>anyObject());
+EasyMock.expectLastCall();
+
+// WorkerSinkTask::stop
+consumer.wakeup();
+PowerMock.expectLastCall();
+sinkTask.stop();
+PowerMock.expectLastCall();
+
+// WorkerSinkTask::close
+consumer.close();
+PowerMock.expectLastCall().andAnswer(new IAnswer() {
+@Override
+public Object answer() throws Throwable {
+rebalanceListener.getValue().onPartitionsRevoked(
+asList(TOPIC_PARTITION, TOPIC_PARTITION2)
+);
+return null;
+}
+});
+transformationChain.close();
+PowerMock.expectLastCall();
+
+PowerMock.replayAll();
+
+workerTask.initialize(TASK_CONFIG);
+workerTask.initializeAndStart();
+workerTask.iteration();
+sinkTaskContext.getValue().requestCommit(); // Force an offset commit
+workerTask.iteration();
+workerTask.stop();
+workerTask.close();
+
+PowerMock.verifyAll();

Review comment:
   Verified locally that this test fails when the additions to the 
`onPartitionsRevoked(...)` method above are removed locally. Nice work, 
@C0urante.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-28 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10455:
---

Assignee: Leah Thomas

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Leah Thomas
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496272243



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -163,6 +163,18 @@ public void start() {
 log.info("Started KafkaBasedLog for topic " + topic);
 }
 
+/**
+ * Sleep for some time so that topic used for this KafkaBasedLog gets 
created. Note that
+ * {@code System.currentTimeMillis()} is not monotonic, so check for that 
condition.

Review comment:
   In fact we don't need value from clock to sleep, only need it to find 
elapsed time and timeout. I have decoupled these two, and also fixed a minor 
issue where it was sleeping first time even if topic was present.
   
   Please take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496271194



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
##
@@ -536,6 +536,22 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 PowerMock.verifyAll();
 }
 
+/**
+ * Check if the waitForTopicCreate method doesn't throw if time moves 
backward, and works
+ * correctly if it increases.
+ */
+@Test
+public void testWatiForTopicCreate() {

Review comment:
   Fixed (actually removed the test)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496270860



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -133,11 +134,14 @@ public void start() {
 List partitions = new ArrayList<>();
 
 // We expect that the topics will have been created either manually by 
the user or automatically by the herder
-List partitionInfos = null;
-long started = time.milliseconds();
-while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
+List partitionInfos = consumer.partitionsFor(topic);
+long started = time.nanoseconds();
+long maxSleepMs = 1_000;
+long sleepMs = 10;
+while (partitionInfos == null && time.nanoseconds() - started < 
CREATE_TOPIC_TIMEOUT_NS) {
+Utils.sleep(sleepMs);

Review comment:
   `Time` has a `sleep` method too btw.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496270581



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -163,6 +163,18 @@ public void start() {
 log.info("Started KafkaBasedLog for topic " + topic);
 }
 
+/**
+ * Sleep for some time so that topic used for this KafkaBasedLog gets 
created. Note that
+ * {@code System.currentTimeMillis()} is not monotonic, so check for that 
condition.

Review comment:
   You can use `hiResClockMs` so that you get the value in `ms`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496267175



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -163,6 +163,18 @@ public void start() {
 log.info("Started KafkaBasedLog for topic " + topic);
 }
 
+/**
+ * Sleep for some time so that topic used for this KafkaBasedLog gets 
created. Note that
+ * {@code System.currentTimeMillis()} is not monotonic, so check for that 
condition.

Review comment:
   In fact I don't like this loop altogether. Going to rewrite it so that 
it doesn't use these constructs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


soondenana commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496263965



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -163,6 +163,18 @@ public void start() {
 log.info("Started KafkaBasedLog for topic " + topic);
 }
 
+/**
+ * Sleep for some time so that topic used for this KafkaBasedLog gets 
created. Note that
+ * {@code System.currentTimeMillis()} is not monotonic, so check for that 
condition.

Review comment:
   Wanted to make least amount of change. I can update the code to use 
monotonic 'nanoTime` instead (nanoseconds in Time interface). We will also need 
to convert that to milli before passing to sleep (unless we want to add nano to 
those interfaces too, like Utils.sleep)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496240453



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##
@@ -163,6 +163,18 @@ public void start() {
 log.info("Started KafkaBasedLog for topic " + topic);
 }
 
+/**
+ * Sleep for some time so that topic used for this KafkaBasedLog gets 
created. Note that
+ * {@code System.currentTimeMillis()} is not monotonic, so check for that 
condition.

Review comment:
   Why don't we use a monotonic timer?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


splett2 commented on a change in pull request #9347:
URL: https://github.com/apache/kafka/pull/9347#discussion_r496238384



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
##
@@ -536,6 +536,22 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 PowerMock.verifyAll();
 }
 
+/**
+ * Check if the waitForTopicCreate method doesn't throw if time moves 
backward, and works
+ * correctly if it increases.
+ */
+@Test
+public void testWatiForTopicCreate() {

Review comment:
   nit: typo in `Wati`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-9370) Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress

2020-09-28 Thread Vikas Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikas Singh updated KAFKA-9370:
---
Comment: was deleted

(was: https://github.com/apache/kafka/pull/9347)

> Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress
> --
>
> Key: KAFKA-9370
> URL: https://issues.apache.org/jira/browse/KAFKA-9370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> `KafkaApis::handleCreatePartitionsRequest` returns `INVALID_TOPIC_EXCEPTION` 
> if the topic is getting deleted. Change it to return 
> `UNKNOWN_TOPIC_OR_PARTITION` instead. After the delete topic api returns, 
> client should see the topic as deleted. The fact that we are processing 
> deletion in background shouldn't have any impact.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9370) Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress

2020-09-28 Thread Vikas Singh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203527#comment-17203527
 ] 

Vikas Singh commented on KAFKA-9370:


https://github.com/apache/kafka/pull/9347

> Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress
> --
>
> Key: KAFKA-9370
> URL: https://issues.apache.org/jira/browse/KAFKA-9370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
>
> `KafkaApis::handleCreatePartitionsRequest` returns `INVALID_TOPIC_EXCEPTION` 
> if the topic is getting deleted. Change it to return 
> `UNKNOWN_TOPIC_OR_PARTITION` instead. After the delete topic api returns, 
> client should see the topic as deleted. The fact that we are processing 
> deletion in background shouldn't have any impact.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soondenana opened a new pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call

2020-09-28 Thread GitBox


soondenana opened a new pull request #9347:
URL: https://github.com/apache/kafka/pull/9347


   System.currentTimeMillis() is not monotonic, so using that to calculate
   time to sleep can result in negative values. That will throw
   IllegalArgumentException.
   
   This change checks for that and sleeps for a second (to avoid tight
   loop) if the value returned is negative.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9346: KAFKA-8410: Split ProcessorContext into Processor/StateStore/RecordContext

2020-09-28 Thread GitBox


vvcephei opened a new pull request #9346:
URL: https://github.com/apache/kafka/pull/9346


   Split up the ProcessorContext into separate containers more appropriate
   for usage in `Processor#init` (`api.ProcessorContext`), 
   `StateStore#init` (`StateStoreContext`), and `Processor#process`
   (the new `Record` and `RecordMetadata` classes).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9584) Removing headers causes ConcurrentModificationException

2020-09-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9584:

Fix Version/s: 2.6.1
   2.5.2
   2.7.0

> Removing headers causes ConcurrentModificationException
> ---
>
> Key: KAFKA-9584
> URL: https://issues.apache.org/jira/browse/KAFKA-9584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Micah Ramos
>Assignee: Micah Ramos
>Priority: Minor
> Fix For: 2.7.0, 2.5.2, 2.6.1
>
>
> The consumer record that is used during punctuate is static, this can cause 
> java.util.ConcurrentModificationException when modifying the headers. 
> Using a single instance of ConsumerRecord for all punctuates causes other 
> strange behavior:
>  # Headers are shared across partitions.
>  # A topology that adds a single header could append an infinite number of 
> headers (one per punctuate iteration), causing memory problems in the current 
> topology as well as down stream consumers since the headers are written with 
> the record when it is produced to a topic.  
>  
> I would expect that each invocation of punctuate would be initialized with a 
> new header object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-28 Thread GitBox


vvcephei commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-700274192


   Cherry-picked to 2.6 and 2.5.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing

2020-09-28 Thread GitBox


mjsax commented on pull request #9027:
URL: https://github.com/apache/kafka/pull/9027#issuecomment-700260133


   Cherry-picked to `2.6`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9308: MINOR: streams docs fixes

2020-09-28 Thread GitBox


mjsax commented on pull request #9308:
URL: https://github.com/apache/kafka/pull/9308#issuecomment-700260093


   Cherry-picked to `2.6`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10531) KafkaBasedLog can sleep for negative values

2020-09-28 Thread Vikas Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikas Singh reassigned KAFKA-10531:
---

Assignee: Vikas Singh

> KafkaBasedLog can sleep for negative values
> ---
>
> Key: KAFKA-10531
> URL: https://issues.apache.org/jira/browse/KAFKA-10531
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Vikas Singh
>Assignee: Vikas Singh
>Priority: Major
> Fix For: 2.6.1
>
>
> {{time.milliseconds}} is not monotonic, so this code can throw :
> {{java.lang.IllegalArgumentException: timeout value is negative}}
>  
> {code:java}
> long started = time.milliseconds();
> while (partitionInfos == null && time.milliseconds() - started < 
> CREATE_TOPIC_TIMEOUT_MS) {
> partitionInfos = consumer.partitionsFor(topic);
> Utils.sleep(Math.min(time.milliseconds() - started, 1000));
> }
> {code}
> We need to check for negative value before sleeping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tahseen447 commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client

2020-09-28 Thread GitBox


tahseen447 commented on pull request #5876:
URL: https://github.com/apache/kafka/pull/5876#issuecomment-700252065


   just checking in, has this been resolved?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10531) KafkaBasedLog can sleep for negative values

2020-09-28 Thread Vikas Singh (Jira)
Vikas Singh created KAFKA-10531:
---

 Summary: KafkaBasedLog can sleep for negative values
 Key: KAFKA-10531
 URL: https://issues.apache.org/jira/browse/KAFKA-10531
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.6.0
Reporter: Vikas Singh
 Fix For: 2.6.1


{{time.milliseconds}} is not monotonic, so this code can throw :

{{java.lang.IllegalArgumentException: timeout value is negative}}

 
{code:java}
long started = time.milliseconds();
while (partitionInfos == null && time.milliseconds() - started < 
CREATE_TOPIC_TIMEOUT_MS) {
partitionInfos = consumer.partitionsFor(topic);
Utils.sleep(Math.min(time.milliseconds() - started, 1000));
}
{code}

We need to check for negative value before sleeping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-28 Thread GitBox


vvcephei merged pull request #8181:
URL: https://github.com/apache/kafka/pull/8181


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-28 Thread GitBox


vvcephei commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-700246317


   Thanks, @MicahRam !
   
   That failure looks unrelated: `Build / JDK 11 / 
kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)

2020-09-28 Thread GitBox


rajinisivaram merged pull request #9317:
URL: https://github.com/apache/kafka/pull/9317


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599

2020-09-28 Thread GitBox


rajinisivaram commented on a change in pull request #9344:
URL: https://github.com/apache/kafka/pull/9344#discussion_r496173659



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1554,6 +1575,11 @@ private ConfigEntry configEntry(CreatableTopicConfigs 
config) {
 
 @Override
 void handleFailure(Throwable throwable) {
+// If there were any topics retries due to a quota exceeded 
exception, we propagate
+// the initial error back to the caller.
+completeQuotaExceededException(futures, 
quotaExceededExceptions,

Review comment:
   Do we want to return quota exceeded in all cases? Apart from timeouts, 
it seems like we should propagate failures rather an earlier quota exceeded 
exception?  That is, if we were throttled for 5 millis and then see a failure, 
the failure is more useful than the fact that we were throttled for 5 millis?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -733,7 +733,9 @@ public void 
testCreateTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO
 time.sleep(defaultApiTimeout + 1);
 
 assertNull(result.values().get("topic1").get());
-TestUtils.assertFutureThrows(result.values().get("topic2"), 
TimeoutException.class);
+ThrottlingQuotaExceededException e = 
TestUtils.assertFutureThrows(result.values().get("topic2"),
+ThrottlingQuotaExceededException.class);
+assertEquals(0, e.throttleTimeMs());

Review comment:
   I can see why we return the delta, but this looks odd when it says 
throttled with a time of zero.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)

2020-09-28 Thread GitBox


rajinisivaram opened a new pull request #9345:
URL: https://github.com/apache/kafka/pull/9345


   Adds support for SSL key and trust stores to be specified in PEM format 
either as files or directly as configuration values.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10493) Ktable out-of-order updates are not being ignored

2020-09-28 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203430#comment-17203430
 ] 

John Roesler commented on KAFKA-10493:
--

I've marked this for 3.0 so that we could consider adding this behavior change 
(with or without an opt-out config) in the major version release.

> Ktable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Priority: Critical
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10493) Ktable out-of-order updates are not being ignored

2020-09-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10493:
-
Fix Version/s: 3.0.0

> Ktable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10493) Ktable out-of-order updates are not being ignored

2020-09-28 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10493:
-
Priority: Critical  (was: Major)

> Ktable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Priority: Critical
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on pull request #9306: KAFKA-10477: Enabling the same behavior of NULL JsonNodeType to MISSI…

2020-09-28 Thread GitBox


rhauch commented on pull request #9306:
URL: https://github.com/apache/kafka/pull/9306#issuecomment-700182723


   @shaikzakiriitm, you mentioned above:
   > From v2.10.0 onwards, in jackson lib, ObjectMapper.readTree(input) started 
to return JsonNode of type MISSING for empty input, as mentioned in the issue: 
[FasterXML/jackson-databind#2211](https://github.com/FasterXML/jackson-databind/issues/2211).
   
   However, the 
[FasterXML/jackson-databind#2211](https://github.com/FasterXML/jackson-databind/issues/2211)
 you mention above talks about this behavior changing in 2.9 relative to 2.8. 
This seems to not align with the your assertion in the 
[KAFKA-10477](https://issues.apache.org/jira/browse/KAFKA-10477) that:
   > Things were working fine when the dependency on jackson lib was of version 
 v2.9.10.3
   
   Can you clarify?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ramesh-muthusamy commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…

2020-09-28 Thread GitBox


ramesh-muthusamy commented on pull request #9319:
URL: https://github.com/apache/kafka/pull/9319#issuecomment-700171024


   @kkonstantine  can you help reviewing this PR 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-09-28 Thread GitBox


chia7712 commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-700149221


   rebase to trigger new QA



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10513) Newly added topic or partitions are not assigned to running consumer groups using static membership

2020-09-28 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203342#comment-17203342
 ] 

Boyang Chen commented on KAFKA-10513:
-

My understanding is that the consumer was supposed to check the metadata for 
topic partition, and rejoin as necessary when the partition number changes?

> Newly added topic or partitions are not assigned to running consumer groups 
> using static membership
> ---
>
> Key: KAFKA-10513
> URL: https://issues.apache.org/jira/browse/KAFKA-10513
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.6.0
>Reporter: Marlon Ou
>Priority: Major
>
> If consumers are polling messages from a certain topic with static membership 
> and we add new partitions to this topic while the consumers are running, no 
> partition reassignment is ever triggered (and hence messages published into 
> the new partitions are never consumed). 
> To reproduce, simply set group instance IDs on the consumers: 
> {code:java}
> props.setProperty("group.instance.id", instanceId);
> {code}
> And then while the static consumers are running, use Kafka's admin client to 
> add more partitions to the topic:
> {code:java}
> adminClient.createPartitions(...)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-28 Thread GitBox


tombentley commented on a change in pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#discussion_r496048334



##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -2061,11 +2062,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @Test
   def testDescribeConfigsForLog4jLogLevels(): Unit = {
 client = Admin.create(createConfig)
-
+LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create 
the logger")
 val loggerConfig = describeBrokerLoggers()
-val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value()
+val kafkaLogLevel = loggerConfig.get("kafka").value()
 val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica")
-assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect 
an undefined log level to be the same as the root logger
+// we expect an undefined log level to be the same as its first ancestor 
logger

Review comment:
   It would, but that's not configured in the `log4j.properties` used for 
this test, so we inherit from the 2nd ancestor, which is ERROR, where as the 
root logger was OFF. Which is why I had to fix this test. Let me fix the 
comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r496013137



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not
+worth keeping since that behavior is unintended. see
+https://github.com/apache/kafka/pull/9284;>KAFKA-10479 
for more discussion.
+see DynamicBrokerConfig.DynamicSecurityConfigs and 
SocketServer.ListenerReconfigurableConfigs
+for reconfigurable configs of existent listeners.
+

Review comment:
   > I think we should keep the comment, I was suggesting we tweak the 
description.
   
   got it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r496011734



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not
+worth keeping since that behavior is unintended. see
+https://github.com/apache/kafka/pull/9284;>KAFKA-10479 
for more discussion.
+see DynamicBrokerConfig.DynamicSecurityConfigs and 
SocketServer.ListenerReconfigurableConfigs
+for reconfigurable configs of existent listeners.
+

Review comment:
   I think we should keep the comment, I was suggesting we tweak the 
description.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-28 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   Gotcha. I just peek'd at it again and you're right. It seems the proper 
fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid 
this change. WDYT @ryannedolan ? Not sure how involved this change would be.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-28 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   Gotcha. I just peek at it again and you're right. It seems the proper 
fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid 
this change. WDYT @ryannedolan ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-28 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   Gotcha. I just peek at it again and you're right. It seems the proper 
fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid 
this change. WDYT @ryannedolan ? Not sure how involved this change would be.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r495992169



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not
+worth keeping since that behavior is unintended. see
+https://github.com/apache/kafka/pull/9284;>KAFKA-10479 
for more discussion.
+see DynamicBrokerConfig.DynamicSecurityConfigs and 
SocketServer.ListenerReconfigurableConfigs
+for reconfigurable configs of existent listeners.
+

Review comment:
   > The previous behavior was a bug as it failed to report an error even 
though it did not change the config dynamically.
   
   ok. it makes sense. I will remove this comment from ```docs/upgrade.html```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r495988153



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not
+worth keeping since that behavior is unintended. see
+https://github.com/apache/kafka/pull/9284;>KAFKA-10479 
for more discussion.
+see DynamicBrokerConfig.DynamicSecurityConfigs and 
SocketServer.ListenerReconfigurableConfigs
+for reconfigurable configs of existent listeners.
+

Review comment:
   I know what you're saying, but this change aligns the implementation 
with the specified behavior. The previous behavior was a bug as it failed to 
report an error even though it did not change the config dynamically.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r495983768



##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -327,10 +327,12 @@ class DynamicBrokerConfigTest {
 EasyMock.replay(kafkaServer)
 
 props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
-val newConfig = KafkaConfig(props)
+new 
DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props))
 
+// it is illegal to update configs of existent listeners
+props.put("listener.name.plaintext.you.should.not.pass", "failure")

Review comment:
   you are right. I will revise the comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r495983283



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not
+worth keeping since that behavior is unintended. see
+https://github.com/apache/kafka/pull/9284;>KAFKA-10479 
for more discussion.
+see DynamicBrokerConfig.DynamicSecurityConfigs and 
SocketServer.ListenerReconfigurableConfigs
+for reconfigurable configs of existent listeners.
+

Review comment:
   > Does it really break compatibility? It seems like the previous 
behavior wasn't working as intended anyway.
   
   It seems to me the APIs behavior is changed so the compatibility is broken.
   
   For example, the code which is used to change non-reconfigurable configs 
gets exception now so users have to write something to handle "new" exception. 
Of course, that case should be very rare so mentioning this in 
docs/upgrade.html is enough to this incompatible change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#discussion_r495975325



##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -2061,11 +2062,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @Test
   def testDescribeConfigsForLog4jLogLevels(): Unit = {
 client = Admin.create(createConfig)
-
+LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create 
the logger")
 val loggerConfig = describeBrokerLoggers()
-val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value()
+val kafkaLogLevel = loggerConfig.get("kafka").value()
 val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica")
-assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect 
an undefined log level to be the same as the root logger
+// we expect an undefined log level to be the same as its first ancestor 
logger

Review comment:
   Wouldn't the first ancestor be `kafka.cluster`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-28 Thread GitBox


mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495975721



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   Yes I'm running MM2 in a Connect cluster. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r495974098



##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -327,10 +327,12 @@ class DynamicBrokerConfigTest {
 EasyMock.replay(kafkaServer)
 
 props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
-val newConfig = KafkaConfig(props)
+new 
DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props))
 
+// it is illegal to update configs of existent listeners
+props.put("listener.name.plaintext.you.should.not.pass", "failure")

Review comment:
   It is possible to update configs of existing listeners if they are 
dynamic, right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…

2020-09-28 Thread GitBox


ijuma commented on a change in pull request #9284:
URL: https://github.com/apache/kafka/pull/9284#discussion_r495973438



##
File path: docs/upgrade.html
##
@@ -27,6 +27,14 @@ Notable changes in 2
 default.api.timeout.ms, and Kafka Streams' new 
task.timeout.ms parameters instead.
 Note that parameter retry.backoff.ms is not impacted by 
this change.
 
+Altering non-reconfigurable configs of existent listeners causes 
InvalidRequestException.
+By contrast, the previous behavior would have caused the updated 
configuration to be persisted, but it wouldn't
+take effect until the broker was restarted. This change breaks 
behavior compatibility but the old behavior is not
+worth keeping since that behavior is unintended. see
+https://github.com/apache/kafka/pull/9284;>KAFKA-10479 
for more discussion.
+see DynamicBrokerConfig.DynamicSecurityConfigs and 
SocketServer.ListenerReconfigurableConfigs
+for reconfigurable configs of existent listeners.
+

Review comment:
   Does it really break compatibility? It seems like the previous behavior 
wasn't working as intended anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495966600



##
File path: core/src/main/resources/common/message/GroupMetadataResponse.json
##
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataResponse",
+  "validVersions": "0-3",
+  "fields": [
+{
+  "name": "protocolType",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "generation",
+  "versions": "0+",
+  "type": "int32"
+},
+{
+  "name": "protocol",
+  "versions": "0+",
+  "type": "string",
+  "nullableVersions": "0+"
+},
+{
+  "name": "leader",
+  "versions": "0+",
+  "type": "string",
+  "nullableVersions": "0+"
+},
+{
+  "name": "members",
+  "versions": "0+",
+  "type": "[]MemberMetadata"
+},
+{
+  "name": "currentStateTimestamp",
+  "versions": "2+",
+  "type": "int64"
+}
+  ],
+  "commonStructs": [
+{
+  "name": "MemberMetadata",
+  "versions": "0-3",
+  "fields": [
+{
+  "name": "memberId",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "clientId",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "clientHost",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "sessionTimeout",
+  "versions": "0+",
+  "type": "int32"
+},
+{
+  "name": "subscription",
+  "versions": "0+",
+  "type": "bytes"

Review comment:
   Could I address it in a separate PR? The true used type is ```byte[]``` 
rather than ```ByteBuffer``` so it may bring a lot changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495940420



##
File path: core/src/main/resources/common/message/OffsetCommitRequest.json
##
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitRequest",
+  "validVersions": "0-1",
+  "fields": [
+{
+  "name": "group",
+  "versions": "0+",

Review comment:
   > I suggest to fix the versions to 0-1 for all fields. 
OffsetCommitRequest and GroupMetadataRequest are in the same topic and we use 
the version to differentiate the two so it sounds better to use fix ranges here 
for the time being. The same for GroupMetadataRequest.
   
   I'd like to merge GroupMetadataRequest and OffsetCommitRequest so we can fix 
the version issue in generated code. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495940420



##
File path: core/src/main/resources/common/message/OffsetCommitRequest.json
##
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitRequest",
+  "validVersions": "0-1",
+  "fields": [
+{
+  "name": "group",
+  "versions": "0+",

Review comment:
   > I suggest to fix the versions to 0-1 for all fields. 
OffsetCommitRequest and GroupMetadataRequest are in the same topic and we use 
the version to differentiate the two so it sounds better to use fix ranges here 
for the time being. The same for GroupMetadataRequest.
   
   I'd like to merge GroupMetadataRequest and OffsetCommitRequest so we can fix 
the version issue in generated code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495932581



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -997,173 +996,45 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-new Field("topic", STRING),
-new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64),
-new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-new Field("offset", INT64),
-new Field("leader_epoch", INT32),
-new Field("metadata", STRING, "Associated metadata.", ""),
-new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-new Field(MEMBER_ID_KEY, STRING),
-new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-new Field(CLIENT_ID_KEY, STRING),
-new Field(CLIENT_HOST_KEY, STRING),
-new Field(REBALANCE_TIMEOUT_KEY, INT32),
-new Field(SESSION_TIMEOUT_KEY, INT32),
-new Field(SUBSCRIPTION_KEY, BYTES),
-new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  

[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495919910



##
File path: core/src/main/resources/common/message/OffsetCommitResponse.json
##
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitResponse",
+  "validVersions": "0-3",
+  "fields": [
+{
+  "name": "offset",
+  "versions": "0+",
+  "type": "int64"
+},
+{
+  "name": "metadata",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "timestamp",
+  "versions": "0",
+  "type": "int64"
+},
+{
+  "name": "commitTimestamp",
+  "versions": "1+",
+  "type": "int64"
+},

Review comment:
   good idea!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495918349



##
File path: core/src/main/resources/common/message/GroupMetadataRequest.json
##
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataRequest",

Review comment:
   Sorry for misleading your comment :(
   
   will follow the format in next commit
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9747) No tasks created for a connector

2020-09-28 Thread Igor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203203#comment-17203203
 ] 

Igor commented on KAFKA-9747:
-

Same issue for Debezium (SqlServer connector)

> No tasks created for a connector
> 
>
> Key: KAFKA-9747
> URL: https://issues.apache.org/jira/browse/KAFKA-9747
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04 LTS
> Platform: Confluent Platform 5.4
> HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge
>Reporter: Vit Koma
>Priority: Major
> Attachments: connect-distributed.properties, connect.log
>
>
> We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
> (MongoDB) and Confluent S3 connectors. When adding a new connector via the 
> REST API the connector is created in RUNNING state, but no tasks are created 
> for the connector.
> Pausing and resuming the connector does not help. When we stop all workers 
> and then start them again, the tasks are created and everything runs as it 
> should.
> The issue does not show up if we run only a single node.
> The issue is not caused by the connector plugins, because we see the same 
> behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
> that Debezium is correctly returning a task configuration from the 
> Connector.taskConfigs() method.
> Connector configuration examples
> Debezium:
> {
>   "name": "qa-mongodb-comp-converter-task|1",
>   "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": 
> "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
> "mongodb.name": "qa-debezium-comp",
> "mongodb.ssl.enabled": true,
> "collection.whitelist": "converter[.]task",
> "tombstones.on.delete": true
>   }
> }
> S3 Connector:
> {
>   "name": "qa-s3-sink-task|1",
>   "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "topics": "qa-debezium-comp.converter.task",
> "topics.dir": "data/env/qa",
> "s3.region": "eu-west-1",
> "s3.bucket.name": "",
> "flush.size": "15000",
> "rotate.interval.ms": "360",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": 
> "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
> "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.DefaultPartitioner",
> "schema.compatibility": "NONE",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enable": false,
> "value.converter.schemas.enable": false,
> "transforms": "ExtractDocument",
> 
> "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
>   }
> }
> The connectors are created using curl: {{curl -X POST -H "Content-Type: 
> application/json" --data @ http:/:10083/connectors}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r495882544



##
File path: core/src/main/resources/common/message/GroupMetadataResponse.json
##
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "GroupMetadataResponse",
+  "validVersions": "0-3",
+  "fields": [
+{
+  "name": "protocolType",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "generation",
+  "versions": "0+",
+  "type": "int32"
+},
+{
+  "name": "protocol",
+  "versions": "0+",
+  "type": "string",
+  "nullableVersions": "0+"
+},
+{
+  "name": "leader",
+  "versions": "0+",
+  "type": "string",
+  "nullableVersions": "0+"
+},
+{
+  "name": "members",
+  "versions": "0+",
+  "type": "[]MemberMetadata"
+},
+{
+  "name": "currentStateTimestamp",

Review comment:
   I think that we should keep `currentStateTimestamp` before `members` as 
we can't reorder fields.
   
   ```
 private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
   new Field(PROTOCOL_TYPE_KEY, STRING),
   new Field(GENERATION_KEY, INT32),
   new Field(PROTOCOL_KEY, NULLABLE_STRING),
   new Field(LEADER_KEY, NULLABLE_STRING),
   new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
   new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
   ```

##
File path: core/src/main/resources/common/message/OffsetCommitResponse.json
##
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "name": "OffsetCommitResponse",
+  "validVersions": "0-3",
+  "fields": [
+{
+  "name": "offset",
+  "versions": "0+",
+  "type": "int64"
+},
+{
+  "name": "metadata",
+  "versions": "0+",
+  "type": "string"
+},
+{
+  "name": "timestamp",
+  "versions": "0",
+  "type": "int64"
+},
+{
+  "name": "commitTimestamp",
+  "versions": "1+",
+  "type": "int64"
+},
+{
+  "name": "expireTimestamp",
+  "versions": "1",
+  "type": "int64"
+},
+{
+  "name": "leaderEpoch",

Review comment:
   `leaderEpoch` should be before `metadata`.

##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -1321,22 +1159,15 @@ object GroupMetadataManager {
*/
   def readMessageKey(buffer: ByteBuffer): BaseKey = {
 val version = buffer.getShort
-val keySchema = schemaForKey(version)
-val key = keySchema.read(buffer)
 
 if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
   // version 0 and 1 refer to offset
-  val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
-  val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
-  val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
-
-  OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, 
partition)))
-
+  val key = new GenOffsetCommitRequest(new ByteBufferAccessor(buffer), 
version)
+  OffsetKey(version, GroupTopicPartition(key.group(), new 
TopicPartition(key.topic(), key.partition(

Review comment:
   nit: The parenthesis may not be necessary for all the getters. It is 
worth checking the other cases below as well.

##
File path: core/src/main/resources/common/message/GroupMetadataRequest.json
##
@@ -0,0 +1,27 @@

[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-28 Thread GitBox


scanterog commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495871017



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   I'm not sure if I follow it. This prefix "source_cluster" is not user 
specified. It is prefixed somewhere else by mm2 (I can get the lines if you 
want later on). The config specified 
[here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2)
 will actually be honored. It does not imply changes on the MirrorMaker config 
side at least. Are you talking about the case of running MirrorMaker in a 
connect cluster rather than as a dedicated cluster? I didn't check that one 
TBH. Maybe @ryannedolan has more insight on this one?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)

2020-09-28 Thread GitBox


dajac opened a new pull request #9344:
URL: https://github.com/apache/kafka/pull/9344


   KIP-599 had proposed to keep returning the 
`ThrottlingQuotaExceededException` to the called even when the request times 
out due to reaching `default.api.timeout.ms`. The current implementation does 
not cover this yet.
   
   From KIP-599:
   > Once `default.api.timeout.ms` has been reached, the topics which were 
throttled will return the `ThrottlingQuotaExceededException` to the caller.
   
   This PR adds the logic to preserve the `ThrottlingQuotaExceededException` 
when topics are retried. The `throttleTimeMs` is also adjusted accordingly as 
the request could remain pending or in-flight for quite a long time.
   
   I have run various tests on clusters with enabled quotas and I, indeed, find 
it better to preserve the exception. Otherwise, the caller does not really 
understand what is going on. This allows the caller to take the appropriate 
measure and also to take the `throttleTimeMs` into consideration.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9343: KAFKA-10332: Update MM2 refreshTopicPartitions() logic

2020-09-28 Thread GitBox


mimaison commented on pull request #9343:
URL: https://github.com/apache/kafka/pull/9343#issuecomment-699932205


   cc @ryannedolan 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison opened a new pull request #9343: KAFKA-10332: Update MM2 refreshTopicPartitions() logic

2020-09-28 Thread GitBox


mimaison opened a new pull request #9343:
URL: https://github.com/apache/kafka/pull/9343


   Trigger task reconfiguration when:
   - topic-partitions are created or deleted on source cluster
   - topic-partitions are missing on target cluster
   
   Co-authored-by: Mickael Maison 
   Co-authored-by: Edoardo Comar 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10530) kafka-streams-application-reset misses some internal topics

2020-09-28 Thread Oliver Weiler (Jira)
Oliver Weiler created KAFKA-10530:
-

 Summary: kafka-streams-application-reset misses some internal 
topics
 Key: KAFKA-10530
 URL: https://issues.apache.org/jira/browse/KAFKA-10530
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Oliver Weiler


While the \{{kafka-streams-application-reset}} tool works in most cases, it 
misses some internal topics when using {{Foreign Key Table-Table Joins}}.

After execution, there are still two internal topics left which were not deleted

{code}
bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic
bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer 
bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic
{code}

The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires 
the internal topic to end with {{-changelog}} or {{-repartition}} (which the 
mentioned topics don't).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-28 Thread GitBox


edenhill commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r495836397



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   I'll see what I can do.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-28 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r495820772



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   Hello, @edenhill 
   
   Yes, but I'm a bit confused about ducktape release.
   Can't receive much feedback from maintainers.
   
   https://github.com/confluentinc/ducktape/issues/245





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override

2020-09-28 Thread GitBox


mimaison commented on a change in pull request #9313:
URL: https://github.com/apache/kafka/pull/9313#discussion_r495817863



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
##
@@ -119,4 +120,41 @@ public void testNonMutationOfConfigDef() {
 connectorConfigDef.names().contains(taskSpecificProperty)
 ));
 }
+
+@Test
+public void testSourceConsumerConfig() {
+Map connectorProps = makeProps(
+MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + 
"max.poll.interval.ms", "12"
+);
+MirrorConnectorConfig config = new 
MirrorConnectorConfig(connectorProps);
+Map connectorConsumerProps = 
config.sourceConsumerConfig();
+Map expectedConsumerProps = new HashMap<>();
+expectedConsumerProps.put("enable.auto.commit", "false");
+expectedConsumerProps.put("auto.offset.reset", "earliest");
+expectedConsumerProps.put("max.poll.interval.ms", "12");
+assertEquals(expectedConsumerProps, connectorConsumerProps);
+
+// checking auto.offset.reset override works
+connectorProps = makeProps(
+MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + 
"auto.offset.reset", "latest"
+);
+config = new MirrorConnectorConfig(connectorProps);
+connectorConsumerProps = config.sourceConsumerConfig();
+expectedConsumerProps.put("auto.offset.reset", "latest");
+expectedConsumerProps.remove("max.poll.interval.ms");
+assertEquals(expectedConsumerProps, connectorConsumerProps);
+}
+
+@Test
+public void testSourceProducerConfig() {
+Map connectorProps = makeProps(
+MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + "acks", "1"
+);
+MirrorConnectorConfig config = new 
MirrorConnectorConfig(connectorProps);
+Map connectorProducerProps = 
config.sourceProducerConfig();
+Map expectedProducerProps = new HashMap<>();
+expectedProducerProps.put("acks", "1");
+assertEquals(expectedProducerProps, connectorProducerProps);
+}
+

Review comment:
   While we're at it, could we also add tests for `targetAdminConfig()` and 
`sourceAdminConfig()`?

##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
##
@@ -199,8 +199,8 @@
 
 protected static final String SOURCE_CLUSTER_PREFIX = 
MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
 protected static final String TARGET_CLUSTER_PREFIX = 
MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
-protected static final String PRODUCER_CLIENT_PREFIX = "producer.";
-protected static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+protected static final String PRODUCER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "producer.";
+protected static final String CONSUMER_CLIENT_PREFIX = 
SOURCE_CLUSTER_PREFIX + "consumer.";

Review comment:
   These changes are going to break existing users. For example, I have 
connectors with a few settings prefixed with `consumer.`. I wonder if we could 
keep the old behaviour (even if partially broken) while adding the proper 
prefixes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-09-28 Thread GitBox


chia7712 commented on pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#issuecomment-699902060


   rebase to fix conflicts



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-28 Thread GitBox


edenhill commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r495804848



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   A new ducktape version needs to be released, and this line changed to a 
versioned pypi install, prior to merge, right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] manijndl7 commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools

2020-09-28 Thread GitBox


manijndl7 commented on a change in pull request #9255:
URL: https://github.com/apache/kafka/pull/9255#discussion_r495786493



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value
+ * @param timestamp to be converted
+ * @return epoch value of a given timestamp
+ * @throws ParseException for timestamp that doesn't follow ISO8601 format
+ */
+public static long getDateTime(String timestamp) throws ParseException {
+final String[] timestampParts = timestamp.split("T");
+if (timestampParts.length < 2) {
+throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+}
+
+final String secondPart = timestampParts[1];
+if (secondPart == null || secondPart.isEmpty()) {
+throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+}
+
+if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
   Hi @mjax request you to please review once.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huangyiminghappy commented on pull request #9304: KAFKA-10502:TimestampRouter may occur threadlocal leak

2020-09-28 Thread GitBox


huangyiminghappy commented on pull request #9304:
URL: https://github.com/apache/kafka/pull/9304#issuecomment-699862527


   > Reading on the SO thread I think this fix makes sense. Merging to trunk 
now.
   
   thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.

2020-09-28 Thread GitBox


thake commented on pull request #9338:
URL: https://github.com/apache/kafka/pull/9338#issuecomment-699857289


   The remaining failing tests do not seem to be related to this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xakassi commented on pull request #9211: KAFKA-10426: Deadlock on session key update.

2020-09-28 Thread GitBox


xakassi commented on pull request #9211:
URL: https://github.com/apache/kafka/pull/9211#issuecomment-699805679


   Hi, guys! Hi, @kkonstantine !
   This is my first PR to Kafka and I would like to clarify if there is 
anything else I need to do? Or should I just wait and you will take care of 
this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10529) Controller should throttle partition reassignment

2020-09-28 Thread Badai Aqrandista (Jira)
Badai Aqrandista created KAFKA-10529:


 Summary: Controller should throttle partition reassignment 
 Key: KAFKA-10529
 URL: https://issues.apache.org/jira/browse/KAFKA-10529
 Project: Kafka
  Issue Type: Improvement
Reporter: Badai Aqrandista


With 
[KIP-455|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment]],
 reassignment can be triggered via AdminClient API. However, when reassigning a 
large number of topic partitions at once, this can cause a storm of 
LeaderAndIsr and UpdateMetadata requests, which can occupy Controller thread 
for some time. And this prevents Controller from processing other requests. 

So, Controller should throttle sending LeaderAndIsr request when actioning a 
reassignment request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >