[jira] [Commented] (KAFKA-9220) TimeoutException when using kafka-preferred-replica-election
[ https://issues.apache.org/jira/browse/KAFKA-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203648#comment-17203648 ] Or Shemesh commented on KAFKA-9220: --- [~akumar] Go for it and take it:) Thanks!!! > TimeoutException when using kafka-preferred-replica-election > > > Key: KAFKA-9220 > URL: https://issues.apache.org/jira/browse/KAFKA-9220 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.3.0 >Reporter: Or Shemesh >Priority: Major > > When running kafka-preferred-replica-election --bootstrap-server xxx:9092 > I'm getting this error: > Timeout waiting for election resultsTimeout waiting for election > resultsException in thread "main" kafka.common.AdminCommandFailedException at > kafka.admin.PreferredReplicaLeaderElectionCommand$AdminClientCommand.electPreferredLeaders(PreferredReplicaLeaderElectionCommand.scala:246) > at > kafka.admin.PreferredReplicaLeaderElectionCommand$.run(PreferredReplicaLeaderElectionCommand.scala:78) > at > kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:42) > at > kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)Caused > by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout. > > Because we have a big cluster and getting all the data from the zookeeper is > taking more the 30 second. > > After searching the code I saw that the 30 second is hard-coded can you > enable us to set the timeout as parameter? > [https://github.com/confluentinc/kafka/blob/master/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 opened a new pull request #9349: MINOR: add proper checks to KafkaConsumer.groupMetadata
chia7712 opened a new pull request #9349: URL: https://github.com/apache/kafka/pull/9349 add following checks to ```KafkaConsumer.groupMetadata``` 1. null check of coordinator (replace NPE by ```InvalidGroupIdException``` which is same to other methods) 1. concurrent check (```groupMetadata``` is not thread-safe so concurrent check is necessary) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9348: KAFKA-10527; Voters should not reinitialize as leader in same epoch
hachikuji opened a new pull request #9348: URL: https://github.com/apache/kafka/pull/9348 One of the invariants that the raft replication protocol attempts to ensure is that each record is uniquely identified by leader epoch and offset. This can be violated if a leader remains elected with the same epoch between restarts since unflushed data could be lost. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10513) Newly added topic or partitions are not assigned to running consumer groups using static membership
[ https://issues.apache.org/jira/browse/KAFKA-10513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203622#comment-17203622 ] Guozhang Wang commented on KAFKA-10513: --- [~mou@ea] could you try setting the `metadata.max.age.ms` to a very small value and retry this scenario again? [~bchen225242] and I looked at both client and broker side code in 2.6 and we think it should still trigger rebalance. > Newly added topic or partitions are not assigned to running consumer groups > using static membership > --- > > Key: KAFKA-10513 > URL: https://issues.apache.org/jira/browse/KAFKA-10513 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.6.0 >Reporter: Marlon Ou >Priority: Major > > If consumers are polling messages from a certain topic with static membership > and we add new partitions to this topic while the consumers are running, no > partition reassignment is ever triggered (and hence messages published into > the new partitions are never consumed). > To reproduce, simply set group instance IDs on the consumers: > {code:java} > props.setProperty("group.instance.id", instanceId); > {code} > And then while the static consumers are running, use Kafka's admin client to > add more partitions to the topic: > {code:java} > adminClient.createPartitions(...) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10530) kafka-streams-application-reset misses some internal topics
[ https://issues.apache.org/jira/browse/KAFKA-10530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203616#comment-17203616 ] John Roesler commented on KAFKA-10530: -- Thanks for the report [~oweiler] , and sorry for the trouble. This report looks like https://issues.apache.org/jira/browse/KAFKA-9859, which should have been _fixed_ in 2.6.0. Can you confirm that you still see this issue in 2.6.0? Thanks, -John > kafka-streams-application-reset misses some internal topics > --- > > Key: KAFKA-10530 > URL: https://issues.apache.org/jira/browse/KAFKA-10530 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Oliver Weiler >Priority: Major > > While the \{{kafka-streams-application-reset}} tool works in most cases, it > misses some internal topics when using {{Foreign Key Table-Table Joins}}. > After execution, there are still two internal topics left which were not > deleted > {code} > bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic > bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer > bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic > {code} > The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires > the internal topic to end with {{-changelog}} or {{-repartition}} (which the > mentioned topics don't). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8266) Improve `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
[ https://issues.apache.org/jira/browse/KAFKA-8266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203601#comment-17203601 ] Chia-Ping Tsai commented on KAFKA-8266: --- flaky again :( https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9284/6/testReport/kafka.api/ConsumerBounceTest/Build___JDK_11___testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/ > Improve > `testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup` > > > Key: KAFKA-8266 > URL: https://issues.apache.org/jira/browse/KAFKA-8266 > Project: Kafka > Issue Type: Test >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > > Some additional validation could be done after the member gets kicked out. > The main thing is showing that the group can continue to consume data and > commit offsets. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r496340632 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not Review comment: I had deleted the sentence about behavior compatibility :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r496335557 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -201,26 +213,24 @@ public void close() { @Test public void testReplication() throws InterruptedException { +String consumerGroupName = "consumer-group-testReplication"; +Map consumerProps = new HashMap() {{ +put("group.id", consumerGroupName); +put("auto.offset.reset", "latest"); Review comment: Oops, spoke too soon. Even though `latest` is Kafka's default, `EmbeddedKafkaCluster::createConsumer` [defaults it to `earliest`](https://github.com/apache/kafka/blob/48b56e533b3ff22ae0e2cf7fcc649e7df19f2b06/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L444) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r496326988 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -201,26 +213,24 @@ public void close() { @Test public void testReplication() throws InterruptedException { +String consumerGroupName = "consumer-group-testReplication"; +Map consumerProps = new HashMap() {{ +put("group.id", consumerGroupName); +put("auto.offset.reset", "latest"); Review comment: Yep, unnecessary. I'll remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
junrao commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496093216 ## File path: clients/src/main/resources/common/message/ApiVersionsResponse.json ## @@ -55,8 +55,8 @@ "about": "The maximum supported version for the feature." } ] }, -{"name": "FinalizedFeaturesEpoch", "type": "int32", "versions": "3+", - "tag": 1, "taggedVersions": "3+", "default": "-1", +{"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+", Review comment: Space before "name". ## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ## @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 57, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "6", Review comment: This is not included in the KIP. Should we update the KIP? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Represents a range of version levels supported by every broker in a cluster for some feature. + */ +public class FinalizedVersionRange { +private final short minVersionLevel; + +private final short maxVersionLevel; + +/** + * Raises an exception unless the following condition is met: + * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= minVersionLevel. + * + * @param minVersionLevel The minimum version level value. + * @param maxVersionLevel The maximum version level value. + * + * @throws IllegalArgumentException Raised when the condition described above is not met. + */ +public FinalizedVersionRange(final short minVersionLevel, final short maxVersionLevel) { Review comment: Since the user is not expected to instantiate this, should we make the constructor non-public? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import static java.util.stream.Collectors.joining; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Encapsulates details about finalized as well as supported features. This is particularly useful + * to hold the result
[GitHub] [kafka] abbccdda commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496321312 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Yea, you are right, I think this comment belongs to updateFeatures This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10511) Fix minor behavior difference in `MockLog`
[ https://issues.apache.org/jira/browse/KAFKA-10511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10511. - Resolution: Fixed > Fix minor behavior difference in `MockLog` > -- > > Key: KAFKA-10511 > URL: https://issues.apache.org/jira/browse/KAFKA-10511 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Fix minor difference in the implementation of the epoch cache in MockLog. In > `LeaderEpochFileCache`, we ensure new entries increase both start offset and > epoch monotonically. We also do not allow duplicates. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r496306777 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException { waitForCondition(() -> { try { -return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup", +return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup", Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)); } catch (Throwable e) { return false; } }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); -Map primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup", +Map primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup", Duration.ofMillis(CHECKPOINT_DURATION_MS)); - + // Failback consumer group to primary cluster -consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1")); -consumer2.assign(primaryOffsets.keySet()); -primaryOffsets.forEach(consumer2::seek); -consumer2.poll(Duration.ofMillis(500)); - -assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0); -assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0); -assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position( -new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED); -assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position( -new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED); - -consumer2.close(); - +primaryConsumer = primary.kafka().createConsumer(consumerProps); +primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1")); +seek(primaryConsumer, primaryOffsets); +consumeAllMessages(primaryConsumer, 0); + +assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0); +assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0); +assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position( +new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION); +assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position( +new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION); + +Map>> messages2 = consumeAllMessages(primaryConsumer, 0); +// If offset translation was successful we expect no messages to be consumed after failback +assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size()); +primaryConsumer.close(); + // create more matching topics primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i); -backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i); +produceMessages(primary, "test-topic-2", "message-3-", 1); +produceMessages(backup, "test-topic-3", "message-4-", 1); + +assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION, +primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count()); +assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION, +backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count()); + +assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION, +primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count()); +assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION, +backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); +} + +@Test +public void testReplicationWithEmptyPartition() throws InterruptedException { Review comment: Thanks, @mimaison . I've changed the
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r496306298 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException { waitForCondition(() -> { try { -return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup", +return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup", Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0)); } catch (Throwable e) { return false; } }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster."); -Map primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup", +Map primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup", Duration.ofMillis(CHECKPOINT_DURATION_MS)); - + // Failback consumer group to primary cluster -consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1")); -consumer2.assign(primaryOffsets.keySet()); -primaryOffsets.forEach(consumer2::seek); -consumer2.poll(Duration.ofMillis(500)); - -assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0); -assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0); -assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position( -new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED); -assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position( -new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED); - -consumer2.close(); - +primaryConsumer = primary.kafka().createConsumer(consumerProps); +primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1")); +seek(primaryConsumer, primaryOffsets); +consumeAllMessages(primaryConsumer, 0); + +assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0); +assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0); +assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position( +new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION); +assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position( +new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION); + +Map>> messages2 = consumeAllMessages(primaryConsumer, 0); +// If offset translation was successful we expect no messages to be consumed after failback +assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size()); +primaryConsumer.close(); + // create more matching topics primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i); -backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i); +produceMessages(primary, "test-topic-2", "message-3-", 1); +produceMessages(backup, "test-topic-3", "message-4-", 1); + +assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION, +primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count()); +assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION, +backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count()); + +assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION, +primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count()); +assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION, +backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); +} + +@Test +public void testReplicationWithEmptyPartition() throws InterruptedException { +String consumerGroupName =
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r496306398 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -136,10 +141,19 @@ public void setup() throws InterruptedException { backup.kafka().createTopic("primary.test-topic-1", 1); backup.kafka().createTopic("heartbeats", 1); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i); -backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i); -} +// produce to all partitions of test-topic-1 +produceMessages(primary, "test-topic-1", "message-1-"); +produceMessages(backup, "test-topic-1", "message-2-"); + +// Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly +Map dummyProps = new HashMap(); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8134: KAFKA-9546 Allow custom tasks through configuration
rhauch commented on pull request #8134: URL: https://github.com/apache/kafka/pull/8134#issuecomment-700351105 As noted on https://issues.apache.org/jira/browse/KAFKA-9546, I'm not convinced this is useful change, since it would complicate these example connectors that ship with AK and would make customizing them very impractical from a build, release, and installation perspective. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams
[ https://issues.apache.org/jira/browse/KAFKA-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203563#comment-17203563 ] Randall Hauch commented on KAFKA-9546: -- [~galyo], thanks for the suggestion and the PR. I've added the `needs-kip` label, because the `FileStreamSourceConnector ` is part of the Connect API, even though it is intentionally just an example connector that helps demonstrate Connect. Because a KIP is required, I question whether changing this connector is really worth it. And because these file connectors are the only ones that ship with AK, extending them will undoubtably create issues if you're extension is installed into a different version of AK than the one with which it is compiled. If you're providing a customized task class, could you not just provide your own `SourceConnector` class? You'd have a lot more control over, and you've have much more freedom to be able to deploy your connector into nearly any version of a Kafka Connect cluster installation. (The only limitation would be which of the Connect APIs you chose to use, such as the use of headers.) As such, I think it's not worth the complication to the examples nor to your connector to make this change. > Make FileStreamSourceTask extendable with generic streams > - > > Key: KAFKA-9546 > URL: https://issues.apache.org/jira/browse/KAFKA-9546 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Csaba Galyo >Assignee: Csaba Galyo >Priority: Major > Labels: connect-api, needs-kip > Original Estimate: 4h > Remaining Estimate: 4h > > Use case: I want to read a ZIP compressed text file with a file connector and > send it to Kafka. > Currently, we have FileStreamSourceConnector which reads a \n delimited text > file. This connector always returns a task of type FileStreamSourceTask. > The FileStreamSourceTask reads from stdio or opens a file InputStream. The > issue with this approach is that the input needs to be a text file, otherwise > it won't work. > The code should be modified so that users could change the default > InputStream to eg. ZipInputStream, or any other format. The code is currently > written in such a way that it's not possible to extend it, we cannot use a > different input stream. > See example here where the code got copy-pasted just so it could read from a > ZstdInputStream (which reads ZSTD compressed files): > [https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file] > > I suggest 2 changes: > # FileStreamSourceConnector should be extendable to return tasks of > different types. These types would be input by the user through the > configuration map > # FileStreamSourceTask should be modified so it could be extended and child > classes could define different input streams. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
hachikuji commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-700349997 @nym3r0s Take a look at `SenderTest.testForceShutdownWithIncompleteTransaction`. I think we could add a similar test which aborts the transaction directly before the Produce request has been sent. Let us know if you don't have time within the next couple weeks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9332: KAFKA-10511; Ensure monotonic start epoch/offset updates in `MockLog`
hachikuji merged pull request #9332: URL: https://github.com/apache/kafka/pull/9332 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams
[ https://issues.apache.org/jira/browse/KAFKA-9546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9546: - Labels: connect-api needs-kip (was: connect-api) > Make FileStreamSourceTask extendable with generic streams > - > > Key: KAFKA-9546 > URL: https://issues.apache.org/jira/browse/KAFKA-9546 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Csaba Galyo >Assignee: Csaba Galyo >Priority: Major > Labels: connect-api, needs-kip > Original Estimate: 4h > Remaining Estimate: 4h > > Use case: I want to read a ZIP compressed text file with a file connector and > send it to Kafka. > Currently, we have FileStreamSourceConnector which reads a \n delimited text > file. This connector always returns a task of type FileStreamSourceTask. > The FileStreamSourceTask reads from stdio or opens a file InputStream. The > issue with this approach is that the input needs to be a text file, otherwise > it won't work. > The code should be modified so that users could change the default > InputStream to eg. ZipInputStream, or any other format. The code is currently > written in such a way that it's not possible to extend it, we cannot use a > different input stream. > See example here where the code got copy-pasted just so it could read from a > ZstdInputStream (which reads ZSTD compressed files): > [https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file] > > I suggest 2 changes: > # FileStreamSourceConnector should be extendable to return tasks of > different types. These types would be input by the user through the > configuration map > # FileStreamSourceTask should be modified so it could be extended and child > classes could define different input streams. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…
rhauch commented on a change in pull request #9319: URL: https://github.com/apache/kafka/pull/9319#discussion_r496296182 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -260,7 +259,7 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) { // Do not revoke resources for re-assignment while a delayed rebalance is active // Also we do not revoke in two consecutive rebalances by the same leader canRevoke = delay == 0 && canRevoke; - +log.debug("Connector and task to revoke assgn post lb calculation: {}", toRevoke); Review comment: Can you please make this log message more readable? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -577,15 +596,14 @@ private void resetDelay() { numToRevoke = floorTasks; for (WorkerLoad existing : existingWorkers) { Iterator tasks = existing.tasks().iterator(); +numToRevoke = existing.tasksSize() - ceilTasks; Review comment: Isn't it possible that `numToRevoke` might be negative? ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -445,16 +444,34 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, if (scheduledRebalance > 0 && now >= scheduledRebalance) { // delayed rebalance expired and it's time to assign resources log.debug("Delayed rebalance expired. Reassigning lost tasks"); -Optional candidateWorkerLoad = Optional.empty(); +List candidateWorkerLoad = Collections.emptyList(); if (!candidateWorkersForReassignment.isEmpty()) { candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment); } -if (candidateWorkerLoad.isPresent()) { -WorkerLoad workerLoad = candidateWorkerLoad.get(); -log.debug("A candidate worker has been found to assign lost tasks: {}", workerLoad.worker()); -lostAssignments.connectors().forEach(workerLoad::assign); -lostAssignments.tasks().forEach(workerLoad::assign); +if (!candidateWorkerLoad.isEmpty()) { +log.debug("A list of candidate workers has been found to assign lost tasks: {}", candidateWorkerLoad.stream().map(WorkerLoad::worker).collect(Collectors.joining(","))); +Iterator candidateWorkerIterator = candidateWorkerLoad.iterator(); +for (String connector : lostAssignments.connectors()) { +// Loop over the the candidate workers as many times as it takes +if (!candidateWorkerIterator.hasNext()) { +candidateWorkerIterator = candidateWorkerLoad.iterator(); +} +WorkerLoad worker = candidateWorkerIterator.next(); +log.debug("Assigning connector id {} to member {}", connector, worker.worker()); +worker.assign(connector); +log.debug("Assigned connector id {} to member {}", connector, worker.worker()); +} +candidateWorkerIterator = candidateWorkerLoad.iterator(); +for (ConnectorTaskId task : lostAssignments.tasks()) { +if (!candidateWorkerIterator.hasNext()) { +candidateWorkerIterator = candidateWorkerLoad.iterator(); +} +WorkerLoad worker = candidateWorkerIterator.next(); +log.debug("Assigning task id {} to member {}", task, worker.worker()); +worker.assign(task); +log.debug("Assigned task id {} to member {}", task, worker.worker()); Review comment: ```suggestion ``` Do we need both of these debug messages? After all, `worker.assign(...)` is just adding a `ConnectorTaskId` to a collection. How about keeping the first one since this is at this point an on-going process and we've not actually assigned anything to the actual worker node. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java ## @@ -559,6 +576,8 @@ private void resetDelay() { log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum); int floorTasks = totalActiveTasksNum / totalWorkersNum; log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks); +int ceilTasks = (int) Math.ceil((float) totalActiveTasksNum / totalWorkersNum); +log.debug("New rounded down (ceil) average number of tasks per
[GitHub] [kafka] rhauch commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop
rhauch commented on a change in pull request #8910: URL: https://github.com/apache/kafka/pull/8910#discussion_r496239771 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno); this.consumer = consumer; this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); +this.taskStopped = false; Review comment: Shouldn't this be volatile? Yes, it's true that `WorkerSinkTask.close()` is always and only called from within the `WorkerTask.doRun()` after the tasks determines it will stop. However, the `onPartitionsRevoked(...)` method is called from the consumer thread, and making the field volatile is the only way to ensure that the consumer thread reads a non-cached value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #8973: KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once
rhauch merged pull request #8973: URL: https://github.com/apache/kafka/pull/8973 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
hachikuji commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r496287274 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not Review comment: nit: can we take out the sentence about behavior compatibility. I think it is enough to say that the previous behavior was unintended. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop
C0urante commented on a change in pull request #8910: URL: https://github.com/apache/kafka/pull/8910#discussion_r446490310 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -689,6 +692,10 @@ else if (!context.pausedPartitions().isEmpty()) @Override public void onPartitionsRevoked(Collection partitions) { +if (taskStopped) { Review comment: The callback gets invoked on the same thread as the one that `KafkaConsumer::close` is invoked on, so `volatile` isn't strictly necessary. If you (or others) think it'd be good to include just in case that changes or this callback gets invoked after the task is stopped on a different thread (which afaik is not possible atm), I don't have any major objections to adding it. Just didn't want to add it unnecessarily as it might be misleading to people reading the code base down the road. LMKWYT This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task
[ https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10532: -- Description: Today whenever we are closing-dirty a task, we always wipe out the state stores if we are under EOS. But when the closing task was a RESTORING active, or a RUNNING standby, we may actually not need to wipe out the stores since we know that upon resuming, we would still continue restoring the task before transit to processing ever (assuming the LEO offset would not be truncated), i.e. when they resumes it does not matter if the same records gets applied twice during the continued restoration. (was: Today whenever we are closing-dirty a task, we always wipe out the state stores if we are under EOS. But when the closing task was a RESTORING active, or a RUNNING standby, we may actually not need to wipe out the stores since we know that upon resuming, we would still restore the task before transit to processing (assuming the LEO offset would not be truncated), i.e. when they resumes it does not matter if the same records gets applied twice during the continued restoration.) > Do not wipe state store under EOS when closing-dirty a RESTORING active or > RUNNING standby task > --- > > Key: KAFKA-10532 > URL: https://issues.apache.org/jira/browse/KAFKA-10532 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today whenever we are closing-dirty a task, we always wipe out the state > stores if we are under EOS. But when the closing task was a RESTORING active, > or a RUNNING standby, we may actually not need to wipe out the stores since > we know that upon resuming, we would still continue restoring the task before > transit to processing ever (assuming the LEO offset would not be truncated), > i.e. when they resumes it does not matter if the same records gets applied > twice during the continued restoration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10532) Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task
[ https://issues.apache.org/jira/browse/KAFKA-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10532: -- Summary: Do not wipe state store under EOS when closing-dirty a RESTORING active or RUNNING standby task (was: Do not wipe state store under EOS when closing a RESTORING active or RUNNING standby task) > Do not wipe state store under EOS when closing-dirty a RESTORING active or > RUNNING standby task > --- > > Key: KAFKA-10532 > URL: https://issues.apache.org/jira/browse/KAFKA-10532 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Today whenever we are closing-dirty a task, we always wipe out the state > stores if we are under EOS. But when the closing task was a RESTORING active, > or a RUNNING standby, we may actually not need to wipe out the stores since > we know that upon resuming, we would still restore the task before transit to > processing (assuming the LEO offset would not be truncated), i.e. when they > resumes it does not matter if the same records gets applied twice during the > continued restoration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10532) Do not wipe state store under EOS when closing a RESTORING active or RUNNING standby task
Guozhang Wang created KAFKA-10532: - Summary: Do not wipe state store under EOS when closing a RESTORING active or RUNNING standby task Key: KAFKA-10532 URL: https://issues.apache.org/jira/browse/KAFKA-10532 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Today whenever we are closing-dirty a task, we always wipe out the state stores if we are under EOS. But when the closing task was a RESTORING active, or a RUNNING standby, we may actually not need to wipe out the stores since we know that upon resuming, we would still restore the task before transit to processing (assuming the LEO offset would not be truncated), i.e. when they resumes it does not matter if the same records gets applied twice during the continued restoration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9346: KAFKA-8410: Split ProcessorContext into Processor/StateStore/RecordContext
vvcephei commented on a change in pull request #9346: URL: https://github.com/apache/kafka/pull/9346#discussion_r496227502 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ## @@ -90,7 +90,7 @@ void register(final StateStore store, * @param name The store name * @return The state store instance */ -StateStore getStateStore(final String name); + S getStateStore(final String name); Review comment: This is a backward-compatible change. It's necessary if we want to capitalize on the opportunity to implement all the new interfaces with a common ProcessorContextImpl (which I think we do). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java ## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; + +import java.io.File; +import java.util.Map; + +/** + * State store context interface. + */ +public interface StateStoreContext { Review comment: The new StateStoreContext. It contains only the parts of the ProcessorContext that should be appropriate within a StateStore. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; + +public class Record { +private final K key; +private final V value; +private final long timestamp; +private final Headers headers; + +public Record(final K key, final V value, final long timestamp, final Headers headers) { Review comment: The new data object. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +public interface RecordMetadata { Review comment: Metadata is an interface rather than a data class, since it'll never be constructed by users. Note that this also enables us to just re-use the ProcessorContextImpl as the implementation for this. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java ## @@ -140,76 +125,25 @@ Cancellable schedule(final Duration interval, final Punctuator callback); /** - * Forwards a key/value
[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496275381 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -133,11 +134,14 @@ public void start() { List partitions = new ArrayList<>(); // We expect that the topics will have been created either manually by the user or automatically by the herder -List partitionInfos = null; -long started = time.milliseconds(); -while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) { +List partitionInfos = consumer.partitionsFor(topic); +long started = time.nanoseconds(); +long maxSleepMs = 1_000; +long sleepMs = 10; +while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) { +Utils.sleep(sleepMs); Review comment: Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8910: KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop
rhauch commented on a change in pull request #8910: URL: https://github.com/apache/kafka/pull/8910#discussion_r496239771 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -138,6 +139,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno); this.consumer = consumer; this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG); +this.taskStopped = false; Review comment: Shouldn't this be volatile? Yes, it's true that `WorkerSinkTask.close()` is always and only called from within the `WorkerTask.doRun()` after the tasks determines it will stop. However, the `onPartitionsRevoked(...)` method is called from the consumer thread, and making this volatile is the only way to ensure that the consumer thread doesn't read a previously-cached value. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ## @@ -315,6 +315,56 @@ public void testPause() throws Exception { PowerMock.verifyAll(); } +@Test +public void testShutdown() throws Exception { +createTask(initialState); + +expectInitializeTask(); +expectTaskGetTopic(true); + +// first iteration +expectPollInitialAssignment(); + +// second iteration + EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap()); +expectConsumerPoll(1); +expectConversionAndTransformation(1); +sinkTask.put(EasyMock.>anyObject()); +EasyMock.expectLastCall(); + +// WorkerSinkTask::stop +consumer.wakeup(); +PowerMock.expectLastCall(); +sinkTask.stop(); +PowerMock.expectLastCall(); + +// WorkerSinkTask::close +consumer.close(); +PowerMock.expectLastCall().andAnswer(new IAnswer() { +@Override +public Object answer() throws Throwable { +rebalanceListener.getValue().onPartitionsRevoked( +asList(TOPIC_PARTITION, TOPIC_PARTITION2) +); +return null; +} +}); +transformationChain.close(); +PowerMock.expectLastCall(); + +PowerMock.replayAll(); + +workerTask.initialize(TASK_CONFIG); +workerTask.initializeAndStart(); +workerTask.iteration(); +sinkTaskContext.getValue().requestCommit(); // Force an offset commit +workerTask.iteration(); +workerTask.stop(); +workerTask.close(); + +PowerMock.verifyAll(); Review comment: Verified locally that this test fails when the additions to the `onPartitionsRevoked(...)` method above are removed locally. Nice work, @C0urante. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members
[ https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10455: --- Assignee: Leah Thomas > Probing rebalances are not guaranteed to be triggered by non-leader members > --- > > Key: KAFKA-10455 > URL: https://issues.apache.org/jira/browse/KAFKA-10455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: Leah Thomas >Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > Apparently, if a consumer rejoins the group with the same subscription > userdata that it previously sent, it will not trigger a rebalance. The one > exception here is that the group leader will always trigger a rebalance when > it rejoins the group. > This has implications for KIP-441, where we rely on asking an arbitrary > thread to enforce the followup probing rebalances. Technically we do ask a > thread living on the same instance as the leader, so the odds that the leader > will be chosen aren't completely abysmal, but for any multithreaded > application they are still at best only 50%. > Of course in general the userdata will have changed within a span of 10 > minutes, so the actual likelihood of hitting this is much lower – it can > only happen if the member's task offset sums remained unchanged. > Realistically, this probably requires that the member only have > fully-restored active tasks (encoded with the constant sentinel -2) and that > no tasks be added or removed. > > One solution would be to make sure the leader is responsible for the probing > rebalance. To do this, we would need to somehow expose the memberId of the > thread's main consumer to the partition assignor. I'm actually not sure if > that's currently possible to figure out or not. If not, we could just assign > the probing rebalance to every thread on the leader's instance. This > shouldn't result in multiple followup rebalances as the rebalance schedule > will be updated/reset on the first followup rebalance. > Another solution would be to make sure the userdata is always different. We > could encode an extra bit that flip-flops, but then we'd have to persist the > latest value somewhere/somehow. Alternatively we could just encode the next > probing rebalance time in the subscription userdata, since that is guaranteed > to always be different from the previous rebalance. This might get tricky > though, and certainly wastes space in the subscription userdata. Also, this > would only solve the problem for KIP-441 probing rebalances, meaning we'd > have to individually ensure the userdata has changed for every type of > followup rebalance (see related issue below). So the first proposal, > requiring the leader trigger the rebalance, would be preferable. > Note that, imho, we should just allow anyone to trigger a rebalance by > rejoining the group. But this would presumably require a broker-side change > and thus we would still need a workaround for KIP-441 to work with brokers. > > Related issue: > This also means the Streams workaround for [KAFKA-9821|http://example.com] is > not airtight, as we encode the followup rebalance in the member who is > supposed to _receive_ a revoked partition, rather than the member who is > actually revoking said partition. While the member doing the revoking will be > guaranteed to have different userdata, the member receiving the partition may > not. Making it the responsibility of the leader to trigger _any_ type of > followup rebalance would solve this issue as well. > Note that other types of followup rebalance (version probing, static > membership with host info change) are guaranteed to have a change in the > subscription userdata, and will not hit this bug -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496272243 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -163,6 +163,18 @@ public void start() { log.info("Started KafkaBasedLog for topic " + topic); } +/** + * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that + * {@code System.currentTimeMillis()} is not monotonic, so check for that condition. Review comment: In fact we don't need value from clock to sleep, only need it to find elapsed time and timeout. I have decoupled these two, and also fixed a minor issue where it was sleeping first time even if topic was present. Please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496271194 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java ## @@ -536,6 +536,22 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { PowerMock.verifyAll(); } +/** + * Check if the waitForTopicCreate method doesn't throw if time moves backward, and works + * correctly if it increases. + */ +@Test +public void testWatiForTopicCreate() { Review comment: Fixed (actually removed the test) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
ijuma commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496270860 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -133,11 +134,14 @@ public void start() { List partitions = new ArrayList<>(); // We expect that the topics will have been created either manually by the user or automatically by the herder -List partitionInfos = null; -long started = time.milliseconds(); -while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) { +List partitionInfos = consumer.partitionsFor(topic); +long started = time.nanoseconds(); +long maxSleepMs = 1_000; +long sleepMs = 10; +while (partitionInfos == null && time.nanoseconds() - started < CREATE_TOPIC_TIMEOUT_NS) { +Utils.sleep(sleepMs); Review comment: `Time` has a `sleep` method too btw. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
ijuma commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496270581 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -163,6 +163,18 @@ public void start() { log.info("Started KafkaBasedLog for topic " + topic); } +/** + * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that + * {@code System.currentTimeMillis()} is not monotonic, so check for that condition. Review comment: You can use `hiResClockMs` so that you get the value in `ms`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496267175 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -163,6 +163,18 @@ public void start() { log.info("Started KafkaBasedLog for topic " + topic); } +/** + * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that + * {@code System.currentTimeMillis()} is not monotonic, so check for that condition. Review comment: In fact I don't like this loop altogether. Going to rewrite it so that it doesn't use these constructs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496263965 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -163,6 +163,18 @@ public void start() { log.info("Started KafkaBasedLog for topic " + topic); } +/** + * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that + * {@code System.currentTimeMillis()} is not monotonic, so check for that condition. Review comment: Wanted to make least amount of change. I can update the code to use monotonic 'nanoTime` instead (nanoseconds in Time interface). We will also need to convert that to milli before passing to sleep (unless we want to add nano to those interfaces too, like Utils.sleep) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
ijuma commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496240453 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ## @@ -163,6 +163,18 @@ public void start() { log.info("Started KafkaBasedLog for topic " + topic); } +/** + * Sleep for some time so that topic used for this KafkaBasedLog gets created. Note that + * {@code System.currentTimeMillis()} is not monotonic, so check for that condition. Review comment: Why don't we use a monotonic timer? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
splett2 commented on a change in pull request #9347: URL: https://github.com/apache/kafka/pull/9347#discussion_r496238384 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java ## @@ -536,6 +536,22 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { PowerMock.verifyAll(); } +/** + * Check if the waitForTopicCreate method doesn't throw if time moves backward, and works + * correctly if it increases. + */ +@Test +public void testWatiForTopicCreate() { Review comment: nit: typo in `Wati` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (KAFKA-9370) Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress
[ https://issues.apache.org/jira/browse/KAFKA-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh updated KAFKA-9370: --- Comment: was deleted (was: https://github.com/apache/kafka/pull/9347) > Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress > -- > > Key: KAFKA-9370 > URL: https://issues.apache.org/jira/browse/KAFKA-9370 > Project: Kafka > Issue Type: Bug >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > > `KafkaApis::handleCreatePartitionsRequest` returns `INVALID_TOPIC_EXCEPTION` > if the topic is getting deleted. Change it to return > `UNKNOWN_TOPIC_OR_PARTITION` instead. After the delete topic api returns, > client should see the topic as deleted. The fact that we are processing > deletion in background shouldn't have any impact. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9370) Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress
[ https://issues.apache.org/jira/browse/KAFKA-9370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203527#comment-17203527 ] Vikas Singh commented on KAFKA-9370: https://github.com/apache/kafka/pull/9347 > Return UNKNOWN_TOPIC_OR_PARTITION if topic deletion is in progress > -- > > Key: KAFKA-9370 > URL: https://issues.apache.org/jira/browse/KAFKA-9370 > Project: Kafka > Issue Type: Bug >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > > `KafkaApis::handleCreatePartitionsRequest` returns `INVALID_TOPIC_EXCEPTION` > if the topic is getting deleted. Change it to return > `UNKNOWN_TOPIC_OR_PARTITION` instead. After the delete topic api returns, > client should see the topic as deleted. The fact that we are processing > deletion in background shouldn't have any impact. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soondenana opened a new pull request #9347: KAFKA-10531: Check for negative values to Thread.sleep call
soondenana opened a new pull request #9347: URL: https://github.com/apache/kafka/pull/9347 System.currentTimeMillis() is not monotonic, so using that to calculate time to sleep can result in negative values. That will throw IllegalArgumentException. This change checks for that and sleeps for a second (to avoid tight loop) if the value returned is negative. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9346: KAFKA-8410: Split ProcessorContext into Processor/StateStore/RecordContext
vvcephei opened a new pull request #9346: URL: https://github.com/apache/kafka/pull/9346 Split up the ProcessorContext into separate containers more appropriate for usage in `Processor#init` (`api.ProcessorContext`), `StateStore#init` (`StateStoreContext`), and `Processor#process` (the new `Record` and `RecordMetadata` classes). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9584) Removing headers causes ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-9584: Fix Version/s: 2.6.1 2.5.2 2.7.0 > Removing headers causes ConcurrentModificationException > --- > > Key: KAFKA-9584 > URL: https://issues.apache.org/jira/browse/KAFKA-9584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Micah Ramos >Assignee: Micah Ramos >Priority: Minor > Fix For: 2.7.0, 2.5.2, 2.6.1 > > > The consumer record that is used during punctuate is static, this can cause > java.util.ConcurrentModificationException when modifying the headers. > Using a single instance of ConsumerRecord for all punctuates causes other > strange behavior: > # Headers are shared across partitions. > # A topology that adds a single header could append an infinite number of > headers (one per punctuate iteration), causing memory problems in the current > topology as well as down stream consumers since the headers are written with > the record when it is produced to a topic. > > I would expect that each invocation of punctuate would be initialized with a > new header object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
vvcephei commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-700274192 Cherry-picked to 2.6 and 2.5. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing
mjsax commented on pull request #9027: URL: https://github.com/apache/kafka/pull/9027#issuecomment-700260133 Cherry-picked to `2.6`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9308: MINOR: streams docs fixes
mjsax commented on pull request #9308: URL: https://github.com/apache/kafka/pull/9308#issuecomment-700260093 Cherry-picked to `2.6`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10531) KafkaBasedLog can sleep for negative values
[ https://issues.apache.org/jira/browse/KAFKA-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh reassigned KAFKA-10531: --- Assignee: Vikas Singh > KafkaBasedLog can sleep for negative values > --- > > Key: KAFKA-10531 > URL: https://issues.apache.org/jira/browse/KAFKA-10531 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Vikas Singh >Assignee: Vikas Singh >Priority: Major > Fix For: 2.6.1 > > > {{time.milliseconds}} is not monotonic, so this code can throw : > {{java.lang.IllegalArgumentException: timeout value is negative}} > > {code:java} > long started = time.milliseconds(); > while (partitionInfos == null && time.milliseconds() - started < > CREATE_TOPIC_TIMEOUT_MS) { > partitionInfos = consumer.partitionsFor(topic); > Utils.sleep(Math.min(time.milliseconds() - started, 1000)); > } > {code} > We need to check for negative value before sleeping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tahseen447 commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client
tahseen447 commented on pull request #5876: URL: https://github.com/apache/kafka/pull/5876#issuecomment-700252065 just checking in, has this been resolved? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10531) KafkaBasedLog can sleep for negative values
Vikas Singh created KAFKA-10531: --- Summary: KafkaBasedLog can sleep for negative values Key: KAFKA-10531 URL: https://issues.apache.org/jira/browse/KAFKA-10531 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.6.0 Reporter: Vikas Singh Fix For: 2.6.1 {{time.milliseconds}} is not monotonic, so this code can throw : {{java.lang.IllegalArgumentException: timeout value is negative}} {code:java} long started = time.milliseconds(); while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) { partitionInfos = consumer.partitionsFor(topic); Utils.sleep(Math.min(time.milliseconds() - started, 1000)); } {code} We need to check for negative value before sleeping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
vvcephei merged pull request #8181: URL: https://github.com/apache/kafka/pull/8181 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
vvcephei commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-700246317 Thanks, @MicahRam ! That failure looks unrelated: `Build / JDK 11 / kafka.network.ConnectionQuotasTest.testNoConnectionLimitsByDefault` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #9317: KAFKA-10509: Added throttle connection accept rate metric (KIP-612)
rajinisivaram merged pull request #9317: URL: https://github.com/apache/kafka/pull/9317 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599
rajinisivaram commented on a change in pull request #9344: URL: https://github.com/apache/kafka/pull/9344#discussion_r496173659 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1554,6 +1575,11 @@ private ConfigEntry configEntry(CreatableTopicConfigs config) { @Override void handleFailure(Throwable throwable) { +// If there were any topics retries due to a quota exceeded exception, we propagate +// the initial error back to the caller. +completeQuotaExceededException(futures, quotaExceededExceptions, Review comment: Do we want to return quota exceeded in all cases? Apart from timeouts, it seems like we should propagate failures rather an earlier quota exceeded exception? That is, if we were throttled for 5 millis and then see a failure, the failure is more useful than the fact that we were throttled for 5 millis? ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -733,7 +733,9 @@ public void testCreateTopicsRetryThrottlingExceptionWhenEnabledUntilRequestTimeO time.sleep(defaultApiTimeout + 1); assertNull(result.values().get("topic1").get()); -TestUtils.assertFutureThrows(result.values().get("topic2"), TimeoutException.class); +ThrottlingQuotaExceededException e = TestUtils.assertFutureThrows(result.values().get("topic2"), +ThrottlingQuotaExceededException.class); +assertEquals(0, e.throttleTimeMs()); Review comment: I can see why we return the delta, but this looks odd when it says throttled with a time of zero. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #9345: KAFKA-10338; Support PEM format for SSL key and trust stores (KIP-651)
rajinisivaram opened a new pull request #9345: URL: https://github.com/apache/kafka/pull/9345 Adds support for SSL key and trust stores to be specified in PEM format either as files or directly as configuration values. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10493) Ktable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203430#comment-17203430 ] John Roesler commented on KAFKA-10493: -- I've marked this for 3.0 so that we could consider adding this behavior change (with or without an opt-out config) in the major version release. > Ktable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Priority: Critical > Fix For: 3.0.0 > > Attachments: KTableOutOfOrderBug.java > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10493) Ktable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10493: - Fix Version/s: 3.0.0 > Ktable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Priority: Major > Fix For: 3.0.0 > > Attachments: KTableOutOfOrderBug.java > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10493) Ktable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10493: - Priority: Critical (was: Major) > Ktable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Priority: Critical > Fix For: 3.0.0 > > Attachments: KTableOutOfOrderBug.java > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on pull request #9306: KAFKA-10477: Enabling the same behavior of NULL JsonNodeType to MISSI…
rhauch commented on pull request #9306: URL: https://github.com/apache/kafka/pull/9306#issuecomment-700182723 @shaikzakiriitm, you mentioned above: > From v2.10.0 onwards, in jackson lib, ObjectMapper.readTree(input) started to return JsonNode of type MISSING for empty input, as mentioned in the issue: [FasterXML/jackson-databind#2211](https://github.com/FasterXML/jackson-databind/issues/2211). However, the [FasterXML/jackson-databind#2211](https://github.com/FasterXML/jackson-databind/issues/2211) you mention above talks about this behavior changing in 2.9 relative to 2.8. This seems to not align with the your assertion in the [KAFKA-10477](https://issues.apache.org/jira/browse/KAFKA-10477) that: > Things were working fine when the dependency on jackson lib was of version v2.9.10.3 Can you clarify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ramesh-muthusamy commented on pull request #9319: KAFKA-10413 Allow even distribution of lost/new tasks when more than one worker j…
ramesh-muthusamy commented on pull request #9319: URL: https://github.com/apache/kafka/pull/9319#issuecomment-700171024 @kkonstantine can you help reviewing this PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
chia7712 commented on pull request #9102: URL: https://github.com/apache/kafka/pull/9102#issuecomment-700149221 rebase to trigger new QA This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10513) Newly added topic or partitions are not assigned to running consumer groups using static membership
[ https://issues.apache.org/jira/browse/KAFKA-10513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203342#comment-17203342 ] Boyang Chen commented on KAFKA-10513: - My understanding is that the consumer was supposed to check the metadata for topic partition, and rejoin as necessary when the partition number changes? > Newly added topic or partitions are not assigned to running consumer groups > using static membership > --- > > Key: KAFKA-10513 > URL: https://issues.apache.org/jira/browse/KAFKA-10513 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.6.0 >Reporter: Marlon Ou >Priority: Major > > If consumers are polling messages from a certain topic with static membership > and we add new partitions to this topic while the consumers are running, no > partition reassignment is ever triggered (and hence messages published into > the new partitions are never consumed). > To reproduce, simply set group instance IDs on the consumers: > {code:java} > props.setProperty("group.instance.id", instanceId); > {code} > And then while the static consumers are running, use Kafka's admin client to > add more partitions to the topic: > {code:java} > adminClient.createPartitions(...) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on a change in pull request #9266: URL: https://github.com/apache/kafka/pull/9266#discussion_r496048334 ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -2061,11 +2062,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDescribeConfigsForLog4jLogLevels(): Unit = { client = Admin.create(createConfig) - +LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") val loggerConfig = describeBrokerLoggers() -val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value() +val kafkaLogLevel = loggerConfig.get("kafka").value() val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica") -assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as the root logger +// we expect an undefined log level to be the same as its first ancestor logger Review comment: It would, but that's not configured in the `log4j.properties` used for this test, so we inherit from the 2nd ancestor, which is ERROR, where as the root logger was OFF. Which is why I had to fix this test. Let me fix the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r496013137 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not +worth keeping since that behavior is unintended. see +https://github.com/apache/kafka/pull/9284;>KAFKA-10479 for more discussion. +see DynamicBrokerConfig.DynamicSecurityConfigs and SocketServer.ListenerReconfigurableConfigs +for reconfigurable configs of existent listeners. + Review comment: > I think we should keep the comment, I was suggesting we tweak the description. got it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
ijuma commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r496011734 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not +worth keeping since that behavior is unintended. see +https://github.com/apache/kafka/pull/9284;>KAFKA-10479 for more discussion. +see DynamicBrokerConfig.DynamicSecurityConfigs and SocketServer.ListenerReconfigurableConfigs +for reconfigurable configs of existent listeners. + Review comment: I think we should keep the comment, I was suggesting we tweak the description. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: Gotcha. I just peek'd at it again and you're right. It seems the proper fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid this change. WDYT @ryannedolan ? Not sure how involved this change would be. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: Gotcha. I just peek at it again and you're right. It seems the proper fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid this change. WDYT @ryannedolan ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495995224 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: Gotcha. I just peek at it again and you're right. It seems the proper fix would be to fix the way mm2 populates the MirrorConnectorConfig to avoid this change. WDYT @ryannedolan ? Not sure how involved this change would be. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r495992169 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not +worth keeping since that behavior is unintended. see +https://github.com/apache/kafka/pull/9284;>KAFKA-10479 for more discussion. +see DynamicBrokerConfig.DynamicSecurityConfigs and SocketServer.ListenerReconfigurableConfigs +for reconfigurable configs of existent listeners. + Review comment: > The previous behavior was a bug as it failed to report an error even though it did not change the config dynamically. ok. it makes sense. I will remove this comment from ```docs/upgrade.html``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
ijuma commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r495988153 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not +worth keeping since that behavior is unintended. see +https://github.com/apache/kafka/pull/9284;>KAFKA-10479 for more discussion. +see DynamicBrokerConfig.DynamicSecurityConfigs and SocketServer.ListenerReconfigurableConfigs +for reconfigurable configs of existent listeners. + Review comment: I know what you're saying, but this change aligns the implementation with the specified behavior. The previous behavior was a bug as it failed to report an error even though it did not change the config dynamically. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r495983768 ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -327,10 +327,12 @@ class DynamicBrokerConfigTest { EasyMock.replay(kafkaServer) props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093") -val newConfig = KafkaConfig(props) +new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props)) +// it is illegal to update configs of existent listeners +props.put("listener.name.plaintext.you.should.not.pass", "failure") Review comment: you are right. I will revise the comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
chia7712 commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r495983283 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not +worth keeping since that behavior is unintended. see +https://github.com/apache/kafka/pull/9284;>KAFKA-10479 for more discussion. +see DynamicBrokerConfig.DynamicSecurityConfigs and SocketServer.ListenerReconfigurableConfigs +for reconfigurable configs of existent listeners. + Review comment: > Does it really break compatibility? It seems like the previous behavior wasn't working as intended anyway. It seems to me the APIs behavior is changed so the compatibility is broken. For example, the code which is used to change non-reconfigurable configs gets exception now so users have to write something to handle "new" exception. Of course, that case should be very rare so mentioning this in docs/upgrade.html is enough to this incompatible change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
ijuma commented on a change in pull request #9266: URL: https://github.com/apache/kafka/pull/9266#discussion_r495975325 ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -2061,11 +2062,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @Test def testDescribeConfigsForLog4jLogLevels(): Unit = { client = Admin.create(createConfig) - +LoggerFactory.getLogger("kafka.cluster.Replica").trace("Message to create the logger") val loggerConfig = describeBrokerLoggers() -val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value() +val kafkaLogLevel = loggerConfig.get("kafka").value() val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica") -assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as the root logger +// we expect an undefined log level to be the same as its first ancestor logger Review comment: Wouldn't the first ancestor be `kafka.cluster`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
mimaison commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495975721 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: Yes I'm running MM2 in a Connect cluster. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
ijuma commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r495974098 ## File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala ## @@ -327,10 +327,12 @@ class DynamicBrokerConfigTest { EasyMock.replay(kafkaServer) props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093") -val newConfig = KafkaConfig(props) +new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props)) +// it is illegal to update configs of existent listeners +props.put("listener.name.plaintext.you.should.not.pass", "failure") Review comment: It is possible to update configs of existing listeners if they are dynamic, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9284: KAFKA-10479 Throw exception if users try to update configs of existen…
ijuma commented on a change in pull request #9284: URL: https://github.com/apache/kafka/pull/9284#discussion_r495973438 ## File path: docs/upgrade.html ## @@ -27,6 +27,14 @@ Notable changes in 2 default.api.timeout.ms, and Kafka Streams' new task.timeout.ms parameters instead. Note that parameter retry.backoff.ms is not impacted by this change. +Altering non-reconfigurable configs of existent listeners causes InvalidRequestException. +By contrast, the previous behavior would have caused the updated configuration to be persisted, but it wouldn't +take effect until the broker was restarted. This change breaks behavior compatibility but the old behavior is not +worth keeping since that behavior is unintended. see +https://github.com/apache/kafka/pull/9284;>KAFKA-10479 for more discussion. +see DynamicBrokerConfig.DynamicSecurityConfigs and SocketServer.ListenerReconfigurableConfigs +for reconfigurable configs of existent listeners. + Review comment: Does it really break compatibility? It seems like the previous behavior wasn't working as intended anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495966600 ## File path: core/src/main/resources/common/message/GroupMetadataResponse.json ## @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "GroupMetadataResponse", + "validVersions": "0-3", + "fields": [ +{ + "name": "protocolType", + "versions": "0+", + "type": "string" +}, +{ + "name": "generation", + "versions": "0+", + "type": "int32" +}, +{ + "name": "protocol", + "versions": "0+", + "type": "string", + "nullableVersions": "0+" +}, +{ + "name": "leader", + "versions": "0+", + "type": "string", + "nullableVersions": "0+" +}, +{ + "name": "members", + "versions": "0+", + "type": "[]MemberMetadata" +}, +{ + "name": "currentStateTimestamp", + "versions": "2+", + "type": "int64" +} + ], + "commonStructs": [ +{ + "name": "MemberMetadata", + "versions": "0-3", + "fields": [ +{ + "name": "memberId", + "versions": "0+", + "type": "string" +}, +{ + "name": "clientId", + "versions": "0+", + "type": "string" +}, +{ + "name": "clientHost", + "versions": "0+", + "type": "string" +}, +{ + "name": "sessionTimeout", + "versions": "0+", + "type": "int32" +}, +{ + "name": "subscription", + "versions": "0+", + "type": "bytes" Review comment: Could I address it in a separate PR? The true used type is ```byte[]``` rather than ```ByteBuffer``` so it may bring a lot changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495940420 ## File path: core/src/main/resources/common/message/OffsetCommitRequest.json ## @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "OffsetCommitRequest", + "validVersions": "0-1", + "fields": [ +{ + "name": "group", + "versions": "0+", Review comment: > I suggest to fix the versions to 0-1 for all fields. OffsetCommitRequest and GroupMetadataRequest are in the same topic and we use the version to differentiate the two so it sounds better to use fix ranges here for the time being. The same for GroupMetadataRequest. I'd like to merge GroupMetadataRequest and OffsetCommitRequest so we can fix the version issue in generated code. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495940420 ## File path: core/src/main/resources/common/message/OffsetCommitRequest.json ## @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "OffsetCommitRequest", + "validVersions": "0-1", + "fields": [ +{ + "name": "group", + "versions": "0+", Review comment: > I suggest to fix the versions to 0-1 for all fields. OffsetCommitRequest and GroupMetadataRequest are in the same topic and we use the version to differentiate the two so it sounds better to use fix ranges here for the time being. The same for GroupMetadataRequest. I'd like to merge GroupMetadataRequest and OffsetCommitRequest so we can fix the version issue in generated code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495932581 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -997,173 +996,45 @@ object GroupMetadataManager { val MetricsGroup: String = "group-coordinator-metrics" val LoadTimeSensor: String = "GroupPartitionLoadTime" - private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort - private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort - - private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), -new Field("topic", STRING), -new Field("partition", INT32)) - private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") - private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") - private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") - private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64), -new Field("expire_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") - private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset") - private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp") - - private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema( -new Field("offset", INT64), -new Field("leader_epoch", INT32), -new Field("metadata", STRING, "Associated metadata.", ""), -new Field("commit_timestamp", INT64)) - private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset") - private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch") - private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata") - private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp") - - private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) - private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") - - private val MEMBER_ID_KEY = "member_id" - private val GROUP_INSTANCE_ID_KEY = "group_instance_id" - private val CLIENT_ID_KEY = "client_id" - private val CLIENT_HOST_KEY = "client_host" - private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout" - private val SESSION_TIMEOUT_KEY = "session_timeout" - private val SUBSCRIPTION_KEY = "subscription" - private val ASSIGNMENT_KEY = "assignment" - - private val MEMBER_METADATA_V0 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V1 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1 - - private val MEMBER_METADATA_V3 = new Schema( -new Field(MEMBER_ID_KEY, STRING), -new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING), -new Field(CLIENT_ID_KEY, STRING), -new Field(CLIENT_HOST_KEY, STRING), -new Field(REBALANCE_TIMEOUT_KEY, INT32), -new Field(SESSION_TIMEOUT_KEY, INT32), -new Field(SUBSCRIPTION_KEY, BYTES), -new Field(ASSIGNMENT_KEY, BYTES)) - - private val PROTOCOL_TYPE_KEY = "protocol_type" - private val GENERATION_KEY = "generation" - private val PROTOCOL_KEY = "protocol" - private val LEADER_KEY = "leader" -
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495919910 ## File path: core/src/main/resources/common/message/OffsetCommitResponse.json ## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "OffsetCommitResponse", + "validVersions": "0-3", + "fields": [ +{ + "name": "offset", + "versions": "0+", + "type": "int64" +}, +{ + "name": "metadata", + "versions": "0+", + "type": "string" +}, +{ + "name": "timestamp", + "versions": "0", + "type": "int64" +}, +{ + "name": "commitTimestamp", + "versions": "1+", + "type": "int64" +}, Review comment: good idea! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495918349 ## File path: core/src/main/resources/common/message/GroupMetadataRequest.json ## @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "GroupMetadataRequest", Review comment: Sorry for misleading your comment :( will follow the format in next commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203203#comment-17203203 ] Igor commented on KAFKA-9747: - Same issue for Debezium (SqlServer connector) > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Priority: Major > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > S3 Connector: > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
dajac commented on a change in pull request #9318: URL: https://github.com/apache/kafka/pull/9318#discussion_r495882544 ## File path: core/src/main/resources/common/message/GroupMetadataResponse.json ## @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "GroupMetadataResponse", + "validVersions": "0-3", + "fields": [ +{ + "name": "protocolType", + "versions": "0+", + "type": "string" +}, +{ + "name": "generation", + "versions": "0+", + "type": "int32" +}, +{ + "name": "protocol", + "versions": "0+", + "type": "string", + "nullableVersions": "0+" +}, +{ + "name": "leader", + "versions": "0+", + "type": "string", + "nullableVersions": "0+" +}, +{ + "name": "members", + "versions": "0+", + "type": "[]MemberMetadata" +}, +{ + "name": "currentStateTimestamp", Review comment: I think that we should keep `currentStateTimestamp` before `members` as we can't reorder fields. ``` private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2))) ``` ## File path: core/src/main/resources/common/message/OffsetCommitResponse.json ## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "name": "OffsetCommitResponse", + "validVersions": "0-3", + "fields": [ +{ + "name": "offset", + "versions": "0+", + "type": "int64" +}, +{ + "name": "metadata", + "versions": "0+", + "type": "string" +}, +{ + "name": "timestamp", + "versions": "0", + "type": "int64" +}, +{ + "name": "commitTimestamp", + "versions": "1+", + "type": "int64" +}, +{ + "name": "expireTimestamp", + "versions": "1", + "type": "int64" +}, +{ + "name": "leaderEpoch", Review comment: `leaderEpoch` should be before `metadata`. ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -1321,22 +1159,15 @@ object GroupMetadataManager { */ def readMessageKey(buffer: ByteBuffer): BaseKey = { val version = buffer.getShort -val keySchema = schemaForKey(version) -val key = keySchema.read(buffer) if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) { // version 0 and 1 refer to offset - val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String] - val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String] - val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int] - - OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition))) - + val key = new GenOffsetCommitRequest(new ByteBufferAccessor(buffer), version) + OffsetKey(version, GroupTopicPartition(key.group(), new TopicPartition(key.topic(), key.partition( Review comment: nit: The parenthesis may not be necessary for all the getters. It is worth checking the other cases below as well. ## File path: core/src/main/resources/common/message/GroupMetadataRequest.json ## @@ -0,0 +1,27 @@
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495871017 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: I'm not sure if I follow it. This prefix "source_cluster" is not user specified. It is prefixed somewhere else by mm2 (I can get the lines if you want later on). The config specified [here](https://github.com/apache/kafka/tree/trunk/connect/mirror#producer--consumer--admin-config-used-by-mm2) will actually be honored. It does not imply changes on the MirrorMaker config side at least. Are you talking about the case of running MirrorMaker in a connect cluster rather than as a dedicated cluster? I didn't check that one TBH. Maybe @ryannedolan has more insight on this one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #9344: MINOR; Preserve ThrottlingQuotaExceededException when request timeouts after being retried due to a quota violation (KIP-599)
dajac opened a new pull request #9344: URL: https://github.com/apache/kafka/pull/9344 KIP-599 had proposed to keep returning the `ThrottlingQuotaExceededException` to the called even when the request times out due to reaching `default.api.timeout.ms`. The current implementation does not cover this yet. From KIP-599: > Once `default.api.timeout.ms` has been reached, the topics which were throttled will return the `ThrottlingQuotaExceededException` to the caller. This PR adds the logic to preserve the `ThrottlingQuotaExceededException` when topics are retried. The `throttleTimeMs` is also adjusted accordingly as the request could remain pending or in-flight for quite a long time. I have run various tests on clusters with enabled quotas and I, indeed, find it better to preserve the exception. Otherwise, the caller does not really understand what is going on. This allows the caller to take the appropriate measure and also to take the `throttleTimeMs` into consideration. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9343: KAFKA-10332: Update MM2 refreshTopicPartitions() logic
mimaison commented on pull request #9343: URL: https://github.com/apache/kafka/pull/9343#issuecomment-699932205 cc @ryannedolan This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison opened a new pull request #9343: KAFKA-10332: Update MM2 refreshTopicPartitions() logic
mimaison opened a new pull request #9343: URL: https://github.com/apache/kafka/pull/9343 Trigger task reconfiguration when: - topic-partitions are created or deleted on source cluster - topic-partitions are missing on target cluster Co-authored-by: Mickael Maison Co-authored-by: Edoardo Comar ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10530) kafka-streams-application-reset misses some internal topics
Oliver Weiler created KAFKA-10530: - Summary: kafka-streams-application-reset misses some internal topics Key: KAFKA-10530 URL: https://issues.apache.org/jira/browse/KAFKA-10530 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Oliver Weiler While the \{{kafka-streams-application-reset}} tool works in most cases, it misses some internal topics when using {{Foreign Key Table-Table Joins}}. After execution, there are still two internal topics left which were not deleted {code} bv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-06-topic bbv4-indexer-717e6cc5-acb2-498d-9d08-4814aaa71c81-StreamThread-1-consumer bbv4-indexer-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-14-topic {code} The reason seems to be the {{StreamsResetter.isInternalTopic}} which requires the internal topic to end with {{-changelog}} or {{-repartition}} (which the mentioned topics don't). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
edenhill commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r495836397 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: I'll see what I can do. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r495820772 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: Hello, @edenhill Yes, but I'm a bit confused about ducktape release. Can't receive much feedback from maintainers. https://github.com/confluentinc/ducktape/issues/245 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
mimaison commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r495817863 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java ## @@ -119,4 +120,41 @@ public void testNonMutationOfConfigDef() { connectorConfigDef.names().contains(taskSpecificProperty) )); } + +@Test +public void testSourceConsumerConfig() { +Map connectorProps = makeProps( +MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "max.poll.interval.ms", "12" +); +MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); +Map connectorConsumerProps = config.sourceConsumerConfig(); +Map expectedConsumerProps = new HashMap<>(); +expectedConsumerProps.put("enable.auto.commit", "false"); +expectedConsumerProps.put("auto.offset.reset", "earliest"); +expectedConsumerProps.put("max.poll.interval.ms", "12"); +assertEquals(expectedConsumerProps, connectorConsumerProps); + +// checking auto.offset.reset override works +connectorProps = makeProps( +MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + "auto.offset.reset", "latest" +); +config = new MirrorConnectorConfig(connectorProps); +connectorConsumerProps = config.sourceConsumerConfig(); +expectedConsumerProps.put("auto.offset.reset", "latest"); +expectedConsumerProps.remove("max.poll.interval.ms"); +assertEquals(expectedConsumerProps, connectorConsumerProps); +} + +@Test +public void testSourceProducerConfig() { +Map connectorProps = makeProps( +MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + "acks", "1" +); +MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); +Map connectorProducerProps = config.sourceProducerConfig(); +Map expectedProducerProps = new HashMap<>(); +expectedProducerProps.put("acks", "1"); +assertEquals(expectedProducerProps, connectorProducerProps); +} + Review comment: While we're at it, could we also add tests for `targetAdminConfig()` and `sourceAdminConfig()`? ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java ## @@ -199,8 +199,8 @@ protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; -protected static final String PRODUCER_CLIENT_PREFIX = "producer."; -protected static final String CONSUMER_CLIENT_PREFIX = "consumer."; +protected static final String PRODUCER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "producer."; +protected static final String CONSUMER_CLIENT_PREFIX = SOURCE_CLUSTER_PREFIX + "consumer."; Review comment: These changes are going to break existing users. For example, I have connectors with a few settings prefixed with `consumer.`. I wonder if we could keep the old behaviour (even if partially broken) while adding the proper prefixes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on pull request #9318: URL: https://github.com/apache/kafka/pull/9318#issuecomment-699902060 rebase to fix conflicts This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
edenhill commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r495804848 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: A new ducktape version needs to be released, and this line changed to a versioned pypi install, prior to merge, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] manijndl7 commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools
manijndl7 commented on a change in pull request #9255: URL: https://github.com/apache/kafka/pull/9255#discussion_r495786493 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value + * @param timestamp to be converted + * @return epoch value of a given timestamp + * @throws ParseException for timestamp that doesn't follow ISO8601 format + */ +public static long getDateTime(String timestamp) throws ParseException { +final String[] timestampParts = timestamp.split("T"); +if (timestampParts.length < 2) { +throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); +} + +final String secondPart = timestampParts[1]; +if (secondPart == null || secondPart.isEmpty()) { +throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); +} + +if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { Review comment: Hi @mjax request you to please review once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huangyiminghappy commented on pull request #9304: KAFKA-10502:TimestampRouter may occur threadlocal leak
huangyiminghappy commented on pull request #9304: URL: https://github.com/apache/kafka/pull/9304#issuecomment-699862527 > Reading on the SO thread I think this fix makes sense. Merging to trunk now. thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thake commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.
thake commented on pull request #9338: URL: https://github.com/apache/kafka/pull/9338#issuecomment-699857289 The remaining failing tests do not seem to be related to this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi commented on pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi commented on pull request #9211: URL: https://github.com/apache/kafka/pull/9211#issuecomment-699805679 Hi, guys! Hi, @kkonstantine ! This is my first PR to Kafka and I would like to clarify if there is anything else I need to do? Or should I just wait and you will take care of this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10529) Controller should throttle partition reassignment
Badai Aqrandista created KAFKA-10529: Summary: Controller should throttle partition reassignment Key: KAFKA-10529 URL: https://issues.apache.org/jira/browse/KAFKA-10529 Project: Kafka Issue Type: Improvement Reporter: Badai Aqrandista With [KIP-455|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment]], reassignment can be triggered via AdminClient API. However, when reassigning a large number of topic partitions at once, this can cause a storm of LeaderAndIsr and UpdateMetadata requests, which can occupy Controller thread for some time. And this prevents Controller from processing other requests. So, Controller should throttle sending LeaderAndIsr request when actioning a reassignment request. -- This message was sent by Atlassian Jira (v8.3.4#803005)