[GitHub] [kafka] dongjinleekr commented on pull request #10642: KAFKA-12756: Update Zookeeper to 3.6.3 or higher
dongjinleekr commented on pull request #10642: URL: https://github.com/apache/kafka/pull/10642#issuecomment-839446528 @Boojapho Totally agree. Let's keep an eye on the other projects' updates. It would not be late until Zookeeper and other related projects drop the support for the security vulnerability. +1. I am now testing the Kafka cluster with Zookeeper 3.6.x. Everything is working fine until now, except classpath conflict in Kafka itself. I am now working on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early
[ https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343001#comment-17343001 ] Juan C. Gonzalez-Zurita commented on KAFKA-12718: - Thank you for assigning me. I will begin work on it tomorrow morning. :D > SessionWindows are closed too early > --- > > Key: KAFKA-12718 > URL: https://issues.apache.org/jira/browse/KAFKA-12718 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Juan C. Gonzalez-Zurita >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 3.0.0 > > > SessionWindows are defined based on a {{gap}} parameter, and also support an > additional {{grace-period}} configuration to handle out-of-order data. > To incorporate the session-gap a session window should only be closed at > {{window-end + gap}} and to incorporate grace-period, the close time should > be pushed out further to {{window-end + gap + grace}}. > However, atm we compute the window close time as {{window-end + grace}} > omitting the {{gap}} parameter. > Because default grace-period is 24h most users might not notice this issues. > Even if they set a grace period explicitly (eg, when using suppress()), they > would most likely set a grace-period larger than gap-time not hitting the > issue (or maybe only realize it when inspecting the behavior closely). > However, if a user wants to disable the grace-period and sets it to zero (on > any other value smaller than gap-time), sessions might be close too early and > user might notice. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman removed a comment on pull request #10676: MINOR: rename TopologyDescription.Subtopology to SubtopologyDescription
ableegoldman removed a comment on pull request #10676: URL: https://github.com/apache/kafka/pull/10676#issuecomment-839409037 @wcarlson5 @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630705257 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception { () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); -} finally { streams.close(); Review comment: @mjsax The line 532(original) wait for the streams state equal to **ERROR** after streams closed. Although this test case will pass if we remove `close()`, but it seems we might be better to retain `close()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10676: MINOR: rename TopologyDescription.Subtopology to SubtopologyDescription
ableegoldman commented on pull request #10676: URL: https://github.com/apache/kafka/pull/10676#issuecomment-839409037 @wcarlson5 @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10676: MINOR: rename TopologyDescription.Subtopology to SubtopologyDescription
ableegoldman opened a new pull request #10676: URL: https://github.com/apache/kafka/pull/10676 Literally just renaming a class because I want to use the name `Subtopology` for something else, and the current `Subtopology` class seems to be used solely for the topology description. Split into a separate PR to minimize no-op changes in the main PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630701286 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception { () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); -} finally { streams.close(); Review comment: @mjsax We should not remove this line, otherwise the line 498(original) will throw a timeout exception. The streams state will be always **RUNNING** if we skip `close()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12772) Move all TransactionState transition rules into their states
[ https://issues.apache.org/jira/browse/KAFKA-12772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming resolved KAFKA-12772. Resolution: Fixed > Move all TransactionState transition rules into their states > > > Key: KAFKA-12772 > URL: https://issues.apache.org/jira/browse/KAFKA-12772 > Project: Kafka > Issue Type: Improvement >Reporter: dengziming >Assignee: dengziming >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10667: KAFKA-12772: Move all transaction state transition rules into their states
guozhangwang commented on pull request #10667: URL: https://github.com/apache/kafka/pull/10667#issuecomment-839359339 LGTM. Merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10667: KAFKA-12772: Move all transaction state transition rules into their states
guozhangwang merged pull request #10667: URL: https://github.com/apache/kafka/pull/10667 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ableegoldman commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r630628119 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ## @@ -417,6 +417,7 @@ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna assertEquals(Collections.singletonList(expectedResult), producer.consumerGroupOffsetsHistory()); } +@SuppressWarnings("deprecation") @Test Review comment: Well, it does call deprecated code. But I didn't think to just deprecate the test itself. That makes more sense, will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ijuma commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r630626735 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ## @@ -417,6 +417,7 @@ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna assertEquals(Collections.singletonList(expectedResult), producer.consumerGroupOffsetsHistory()); } +@SuppressWarnings("deprecation") @Test Review comment: The suppression is only needed for cases where non deprecated code has to call deprecated code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ijuma commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r630626586 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ## @@ -417,6 +417,7 @@ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna assertEquals(Collections.singletonList(expectedResult), producer.consumerGroupOffsetsHistory()); } +@SuppressWarnings("deprecation") @Test Review comment: If their main purpose is to test deprecated functionality, we should deprecate the test methods too so we can remove them when the deprecated non test code is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata
ableegoldman commented on a change in pull request #10634: URL: https://github.com/apache/kafka/pull/10634#discussion_r630615608 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -1193,6 +1196,11 @@ public void updateCommittedOffsets(final TopicPartition topicPartition, final Lo committedOffsets.put(topicPartition, offset); } +@Override +public void updateEndOffsets(final TopicPartition topicPartition, final Long offset) { +highWatermark.put(topicPartition, offset); Review comment: Oh I see, I thought we were making a copy of the end offsets in StreamTask#highWatermark, but it's just an unmodifiableMap -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ableegoldman commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r630614141 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ## @@ -417,6 +417,7 @@ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna assertEquals(Collections.singletonList(expectedResult), producer.consumerGroupOffsetsHistory()); } +@SuppressWarnings("deprecation") @Test Review comment: These are testing the deprecated method specifically. There is an identical test below for the non-deprecated version of this API. Are you suggesting we should just go ahead and remove the tests for the deprecated functionality? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ijuma commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r630612361 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java ## @@ -417,6 +417,7 @@ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna assertEquals(Collections.singletonList(expectedResult), producer.consumerGroupOffsetsHistory()); } +@SuppressWarnings("deprecation") @Test Review comment: Are these tests only present for testing deprecated functionality or are they also testing non deprecated functionality? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…
ableegoldman commented on a change in pull request #10673: URL: https://github.com/apache/kafka/pull/10673#discussion_r630612173 ## File path: tests/kafkatest/services/streams.py ## @@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode): "org.apache.kafka.streams.tests.BrokerCompatibilityTest", processingMode) +def prop_file(self): +properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + # the old broker (< 2.4) does not support configuration replication.factor=-1 + "replication.factor": 1} Review comment: Ah, yeah, I bet that's it: we only set it for the EOS tests. Might be better to just set it in the Java code instead as it's easier to find and read, and I believe most other configs are set there. I think this test runs the `StreamsSmokeTest`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ijuma commented on a change in pull request #10675: URL: https://github.com/apache/kafka/pull/10675#discussion_r630612053 ## File path: clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java ## @@ -88,26 +84,13 @@ public Builder(final String transactionalId, .setMemberId(memberId) .setGenerationId(generationId) .setGroupInstanceId(groupInstanceId.orElse(null)); -this.autoDowngrade = autoDowngrade; } @Override public TxnOffsetCommitRequest build(short version) { if (version < 3 && groupMetadataSet()) { -if (autoDowngrade) { -log.trace("Downgrade the request by resetting group metadata fields: " + - "[member.id:{}, generation.id:{}, group.instance.id:{}], because broker " + - "only supports TxnOffsetCommit version {}. Need " + - "v3 or newer to enable this feature", -data.memberId(), data.generationId(), data.groupInstanceId(), version); - - data.setGenerationId(JoinGroupRequest.UNKNOWN_GENERATION_ID) -.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) -.setGroupInstanceId(null); -} else { -throw new UnsupportedVersionException("Broker unexpectedly " + +throw new UnsupportedVersionException("Broker unexpectedly " + Review comment: I think we can improve this message a little: 1. Remove `unexpectedly` since it doesn't add much value and it's a bit confusing (it makes it sound like there's a broker bug) 2. Include the minimum version required (`3`) in the message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…
mjsax commented on a change in pull request #10673: URL: https://github.com/apache/kafka/pull/10673#discussion_r630610600 ## File path: tests/kafkatest/services/streams.py ## @@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode): "org.apache.kafka.streams.tests.BrokerCompatibilityTest", processingMode) +def prop_file(self): +properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + # the old broker (< 2.4) does not support configuration replication.factor=-1 + "replication.factor": 1} Review comment: I guess the logic scattered between Python and Java code... And maybe we need 3 only for EOS tests? Not sure either. Also ok with me to just merge this PR as-is? Or should we update Java code instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
ableegoldman commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630605834 ## File path: clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ## @@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() { } } +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldReturnEmptyCollection() { +List testData = Arrays.asList(); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get empty collection after serialization and deserialization on an empty list"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldReturnNull() { +List testData = null; +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get null after serialization and deserialization on an empty list"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripIntPrimitiveInput() { +List testData = Arrays.asList(1, 2, 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of integer primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() { +List testData = Arrays.asList(1, 2, 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(21, listSerde.serializer().serialize(topic, testData).length, +"Should get length of 21 bytes after serialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripShortPrimitiveInput() { +List testData = Arrays.asList((short) 1, (short) 2, (short) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of short primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() { +List testData = Arrays.asList((short) 1, (short) 2, (short) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); +assertEquals(15, listSerde.serializer().serialize(topic, testData).length, +"Should get length of 15 bytes after serialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripFloatPrimitiveInput() { +List testData = Arrays.asList((float) 1, (float) 2, (float) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of float primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() { +List testData = Arrays.asList((float) 1, (float) 2, (float) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); +assertEquals(21, listSerde.serializer().serialize(topic, testData).length, +"Should get length of 21 bytes after serialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripLongPrimitiveInput() { +List testData = Arrays.asList((long) 1, (long) 2, (long) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Long()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of long primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() { +List testData = Arrays.asList((long) 1, (long) 2, (long) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class,
[GitHub] [kafka] mjsax commented on pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
mjsax commented on pull request #10675: URL: https://github.com/apache/kafka/pull/10675#issuecomment-839282578 \cc @abbccdda and @guozhangwang to make sure we don't to anything bad removing this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
cmccabe commented on pull request #10550: URL: https://github.com/apache/kafka/pull/10550#issuecomment-839281487 Thanks for working on this, @rondagostino , and sorry about the delays in reviewing. > We forward the Create/Remove operations to the controller, but this patch actually short-circuits that if we are using KRaft with the ZooKeeper-based `AclAuthorizer` via the changes to `RaftSupport.maybeForward()`. The reason for short-circuiting it is because the KRaft controller doesn't have the code to create or remove ACLs (`handle{Create,Delete}Acls` in `KafkaApis`). We could add it, of course, in which case the changes to the `maybeForward()` method would be unnecessary. Perhaps it would be simpler to do that instead of delaying it to an additional PR -- is that what you were suggesting? Yes, I think we should just do this in `ControllerApis.scala` and be done with it. It's kind of annoying to do in a follow-on PR since we'd have to add a lot of special-case code which we'd later have to undo, which is not usually the right way to go. We might even be able to move this code into `RequestHandlerHelper` or something since it would be the same between `BrokerApis` and `ControllerApis` (I think?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630603482 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception { () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); -} finally { streams.close(); Review comment: Ok, will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630603389 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception { () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); -} finally { streams.close(); Review comment: Thanks @mjsax for review. Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
cmccabe commented on a change in pull request #10550: URL: https://github.com/apache/kafka/pull/10550#discussion_r630603137 ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager, override def controllerId: Option[Int] = metadataCache.getControllerId } -case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache) +case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig) extends MetadataSupport { + if (config.requiresZookeeper) { +throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") + } override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr) override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException - override def requireRaftOrThrow(createException: => Exception): RaftSupport = this - - override def ensureConsistentWith(config: KafkaConfig): Unit = { -if (config.requiresZookeeper) { - throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") + override def requireZkAuthorizerOrThrow(createException: => Exception) = { Review comment: I don't think we need (or want) to special-case the ZK authorizer here. There is a Confluent authorizer that doesn't depend on ZK, and also a Cloudera one. We don't want to break them. Just forward everything -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
cmccabe commented on a change in pull request #10550: URL: https://github.com/apache/kafka/pull/10550#discussion_r630602650 ## File path: core/src/main/scala/kafka/server/MetadataSupport.scala ## @@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager, override def controllerId: Option[Int] = metadataCache.getControllerId } -case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache) +case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig) extends MetadataSupport { + if (config.requiresZookeeper) { +throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") + } override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr) override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException - override def requireRaftOrThrow(createException: => Exception): RaftSupport = this - - override def ensureConsistentWith(config: KafkaConfig): Unit = { -if (config.requiresZookeeper) { - throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") + override def requireZkAuthorizerOrThrow(createException: => Exception) = { +if (!hasZkAuthorizer) { + throw createException } } + override def requireRaftOrThrow(createException: => Exception): RaftSupport = this override def maybeForward(request: RequestChannel.Request, handler: RequestChannel.Request => Unit, responseCallback: Option[AbstractResponse] => Unit): Unit = { if (!request.isForwarded) { - fwdMgr.forwardRequest(request, responseCallback) + request.header.apiKey match { +case ApiKeys.CREATE_ACLS | ApiKeys.DELETE_ACLS => + if (hasZkAuthorizer) { +handler(request) Review comment: Yeah, I had the same question. It seems like the code is already in place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
mjsax commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630602355 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() { @Test public void testCloseIsIdempotent() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.close(); -final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.close(); +final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); -streams.close(); -Assert.assertEquals("subsequent close() calls should do nothing", -closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +streams.close(); +Assert.assertEquals("subsequent close() calls should do nothing", +closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +} } @Test public void shouldAddThreadWhenRunning() throws InterruptedException { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -final int oldSize = streams.threads.size(); -TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); -assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); -assertThat(streams.threads.size(), equalTo(oldSize + 1)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.start(); +final int oldSize = streams.threads.size(); +TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); +assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); +assertThat(streams.threads.size(), equalTo(oldSize + 1)); +} } @Test public void shouldNotAddThreadWhenCreated() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenClosed() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.close(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.close(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenError() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.start(); -globalStreamThread.shutdown(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.start(); +globalStreamThread.shutdown(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotReturnDeadThreads() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -streamThreadOne.shutdown(); -final Set threads = streams.localThreadsMetadata(); +final Set threads; +try (final KafkaStreams
[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630602092 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() { @Test public void testCloseIsIdempotent() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.close(); -final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.close(); +final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); -streams.close(); -Assert.assertEquals("subsequent close() calls should do nothing", -closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +streams.close(); +Assert.assertEquals("subsequent close() calls should do nothing", +closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +} } @Test public void shouldAddThreadWhenRunning() throws InterruptedException { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -final int oldSize = streams.threads.size(); -TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); -assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); -assertThat(streams.threads.size(), equalTo(oldSize + 1)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.start(); +final int oldSize = streams.threads.size(); +TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); +assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); +assertThat(streams.threads.size(), equalTo(oldSize + 1)); +} } @Test public void shouldNotAddThreadWhenCreated() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenClosed() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.close(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.close(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenError() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.start(); -globalStreamThread.shutdown(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.start(); +globalStreamThread.shutdown(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotReturnDeadThreads() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -streamThreadOne.shutdown(); -final Set threads = streams.localThreadsMetadata(); +final Set threads; +try (final KafkaStreams
[GitHub] [kafka] ableegoldman commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…
ableegoldman commented on a change in pull request #10673: URL: https://github.com/apache/kafka/pull/10673#discussion_r630601750 ## File path: tests/kafkatest/services/streams.py ## @@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode): "org.apache.kafka.streams.tests.BrokerCompatibilityTest", processingMode) +def prop_file(self): +properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + # the old broker (< 2.4) does not support configuration replication.factor=-1 + "replication.factor": 1} Review comment: Sounds like we don't override the default after all? Or we have at least one test where that slipped through 路♀️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
mjsax commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630601702 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception { () -> streams.state() == KafkaStreams.State.PENDING_ERROR, "Thread never stopped." ); -} finally { streams.close(); Review comment: As above. (Maybe similar elsewhere; won't comment on it explicitly below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r630601449 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -525,6 +525,11 @@ private TransactionManager configureTransactionState(ProducerConfig config, final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); final boolean autoDowngradeTxnCommit = config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT); +// Only log a warning if being used outside of Streams, which we know includes "StreamThread-" in the client id +if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) { Review comment: Done https://github.com/apache/kafka/pull/10675 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
mjsax commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630601302 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception { () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")), "Streams never stopped" ); -} finally { streams.close(); Review comment: We can remove this line now. `close()` will be cause automatically using try-with-resource clause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic
ableegoldman opened a new pull request #10675: URL: https://github.com/apache/kafka/pull/10675 Minor followup to [#10573](https://github.com/apache/kafka/pull/10573), see in particular [this comment thread](https://github.com/apache/kafka/pull/10573#discussion_r628777848). Removes this internal Producer config which was only ever used to avoid a very minor amount of work to downgrade the consumer group metadata in the txn commit request. Also replaces deprecation warning suppression with missing `@Deprecated` annotation on the MockProducer's deprecated method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
cmccabe commented on a change in pull request #10550: URL: https://github.com/apache/kafka/pull/10550#discussion_r630601127 ## File path: tests/kafkatest/services/security/kafka_acls.py ## @@ -66,17 +67,46 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, addit This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required to create SCRAM credentials and topics, respectively +:param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise +we use the the client security protocol unless inter-broker security protocol is PLAINTEXT, in which case we use PLAINTEXT. +Then we use the broker's credentials if the selected security protocol matches the inter-broker security protocol, +otherwise we use the client's credentials. """ node = kafka.nodes[0] for operation in ['ClusterAction'] + additional_cluster_operations_to_grant: cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { -'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection), +'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol), 'operation': operation, 'principal': principal } kafka.run_cli_tool(node, cmd) +def remove_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_remove = [], security_protocol=None): Review comment: Since this is new code, it would be really good to avoid introducing `force_use_zk_connection` here if possible. I can't see anywhere in this PR where it is used, is this really necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft
cmccabe commented on a change in pull request #10550: URL: https://github.com/apache/kafka/pull/10550#discussion_r630598753 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -863,7 +878,7 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_c # configure JAAS to provide the typical client credentials jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY use_inter_broker_mechanism_for_client = False -optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop, KafkaService.KRB5_CONF) +optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop, KafkaService.KRB5_CONF) if security_protocol_to_use != "SSL" else "" Review comment: I'm confused by the logic here. If we have security_protocol==SSL then we do not define the jaas properties in KAFKA_OPTS? Seems a bit weird -- why define this when we're using PLAINTEXT or when we're using SASL_SSL, but not when using SSL? Can you add a comment about how this works? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…
mjsax commented on a change in pull request #10673: URL: https://github.com/apache/kafka/pull/10673#discussion_r630595939 ## File path: tests/kafkatest/services/streams.py ## @@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode): "org.apache.kafka.streams.tests.BrokerCompatibilityTest", processingMode) +def prop_file(self): +properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + # the old broker (< 2.4) does not support configuration replication.factor=-1 + "replication.factor": 1} Review comment: Should we set it to 3 instead? IIRC, we run all system tests with 3 brokers? Wondering why the change broke the system tests, as they should have overwritten the default to 3 anyway? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller
cmccabe commented on pull request #10504: URL: https://github.com/apache/kafka/pull/10504#issuecomment-839261742 We need to gate the new behavior behind an IBP bump, right? Otherwise we won't have access to new producer IDs while rolling a cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r630593372 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ## @@ -179,10 +179,18 @@ public void beginTransaction() throws ProducerFencedException { this.sentOffsets = false; } +@SuppressWarnings("deprecation") Review comment: Actually no, we do get a warning if we don't have either annotation. I'll change it to `@Deprecated` then -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller
cmccabe commented on a change in pull request #10504: URL: https://github.com/apache/kafka/pull/10504#discussion_r630587756 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig, } } + def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData, + callback: AllocateProducerIdsResponseData => Unit): Unit = { + +def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = { + results match { +case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code)) +case Right(pidBlock) => callback.apply( + new AllocateProducerIdsResponseData() +.setProducerIdStart(pidBlock.producerIdStart()) +.setProducerIdLen(pidBlock.producerIdLen())) + } +} +eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId, + allocateProducerIdsRequest.brokerEpoch, eventManagerCallback)) + } + + def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = { +// Handle a few short-circuits +if (!isActive) { + callback.apply(Left(Errors.NOT_CONTROLLER)) + return +} + +val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) +if (brokerEpochOpt.isEmpty) { + warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId") + callback.apply(Left(Errors.STALE_BROKER_EPOCH)) Review comment: It seems like this should be BROKER_ID_NOT_REGISTERED -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9826) Log cleaning repeatedly picks same segment with no effect when first dirty offset is past start of active segment
[ https://issues.apache.org/jira/browse/KAFKA-9826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342904#comment-17342904 ] Jun Rao commented on KAFKA-9826: [~zhangzs] : The issue was fixed in 2.4.2. Could you try that version? > Log cleaning repeatedly picks same segment with no effect when first dirty > offset is past start of active segment > - > > Key: KAFKA-9826 > URL: https://issues.apache.org/jira/browse/KAFKA-9826 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.4.1 >Reporter: Steve Rodrigues >Assignee: Steve Rodrigues >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > Seen on a system where a given partition had a single segment, and for > whatever reason (deleteRecords?), the logStartOffset was greater than the > base segment of the log, there were a continuous series of > ``` > [2020-03-03 16:56:31,374] WARN Resetting first dirty offset of FOO-3 to log > start offset 55649 since the checkpointed offset 0 is invalid. > (kafka.log.LogCleanerManager$) > ``` > messages (partition name changed, it wasn't really FOO). This was expected to > be resolved by KAFKA-6266 but clearly wasn't. > Further investigation revealed that a few segments were continuously > cleaning and generating messages in the `log-cleaner.log` of the form: > ``` > [2020-03-31 13:34:50,924] INFO Cleaner 1: Beginning cleaning of log FOO-3 > (kafka.log.LogCleaner) > [2020-03-31 13:34:50,924] INFO Cleaner 1: Building offset map for FOO-3... > (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO Cleaner 1: Building offset map for log FOO-3 > for 0 segments in offset range [55287, 54237). (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO Cleaner 1: Offset map for log FOO-3 complete. > (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO Cleaner 1: Cleaning log FOO-3 (cleaning prior > to Wed Dec 31 19:00:00 EST 1969, discarding tombstones prior to Tue Dec 10 > 13:39:08 EST 2019)... (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO [kafka-log-cleaner-thread-1]: Log cleaner > thread 1 cleaned log FOO-3 (dirty section = [55287, 55287]) > 0.0 MB of log processed in 0.0 seconds (0.0 MB/sec). > Indexed 0.0 MB in 0.0 seconds (0.0 Mb/sec, 100.0% of total time) > Buffer utilization: 0.0% > Cleaned 0.0 MB in 0.0 seconds (NaN Mb/sec, 0.0% of total time) > Start size: 0.0 MB (0 messages) > End size: 0.0 MB (0 messages) NaN% size reduction (NaN% fewer messages) > (kafka.log.LogCleaner) > ``` > What seems to have happened here (data determined for a different partition) > is: > There exist a number of partitions here which get relatively low traffic, > including our friend FOO-5. For whatever reason, LogStartOffset of this > partition has moved beyond the baseOffset of the active segment. (Notes in > other issues indicate that this is a reasonable scenario.) So there’s one > segment, starting at 166266, and a log start of 166301. > grabFilthiestCompactedLog runs and reads the checkpoint file. We see that > this topicpartition needs to be cleaned, and call cleanableOffsets on it > which returns an OffsetsToClean with firstDirtyOffset == logStartOffset == > 166301 and firstUncleanableOffset = max(logStart, activeSegment.baseOffset) = > 116301, and forceCheckpoint = true. > The checkpoint file is updated in grabFilthiestCompactedLog (this is the fix > for KAFKA-6266). We then create a LogToClean object based on the > firstDirtyOffset and firstUncleanableOffset of 166301 (past the active > segment’s base offset). > The LogToClean object has cleanBytes = logSegments(-1, > firstDirtyOffset).map(_.size).sum → the size of this segment. It has > firstUncleanableOffset and cleanableBytes determined by > calculateCleanableBytes. calculateCleanableBytes returns: > {{}} > {{val firstUncleanableSegment = > log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)}} > {{val firstUncleanableOffset = firstUncleanableSegment.baseOffset}} > {{val cleanableBytes = log.logSegments(firstDirtyOffset, > math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum > (firstUncleanableOffset, cleanableBytes)}} > firstUncleanableSegment is activeSegment. firstUncleanableOffset is the base > offset, 166266. cleanableBytes is looking for logSegments(166301, max(166301, > 166266) → which _is the active segment_ > So there are “cleanableBytes” > 0. > We then filter out segments with totalbytes (clean + cleanable) > 0. This > segment has totalBytes > 0, and it has cleanablebytes, so great! It’s > filthiest. > The cleaner picks it, calls cleanLog on it, which then does cleaner.clean, > which returns nextDirtyOffset and cleaner stats. cleaner.clean callls >
[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller
cmccabe commented on a change in pull request #10504: URL: https://github.com/apache/kafka/pull/10504#discussion_r630569991 ## File path: clients/src/main/resources/common/message/AllocateProducerIdsResponse.json ## @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 67, + "type": "response", + "name": "AllocateProducerIdsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level response error code" }, +{ "name": "ProducerIdStart", "type": "int64", "versions": "0+", Review comment: I think this should have entity type `producerId` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ijuma commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r630556204 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -525,6 +525,11 @@ private TransactionManager configureTransactionState(ProducerConfig config, final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); final boolean autoDowngradeTxnCommit = config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT); +// Only log a warning if being used outside of Streams, which we know includes "StreamThread-" in the client id +if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) { Review comment: @ableegoldman Your suggestion to remove the config altogether seems best to me. We don't have a grace period for internal configs, that's why they're internal. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r630553249 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ## @@ -179,10 +179,18 @@ public void beginTransaction() throws ProducerFencedException { this.sentOffsets = false; } +@SuppressWarnings("deprecation") Review comment: I guess it doesn't need to have either, I can remove in a followup PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
ableegoldman commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r630552512 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -525,6 +525,11 @@ private TransactionManager configureTransactionState(ProducerConfig config, final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); final boolean autoDowngradeTxnCommit = config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT); +// Only log a warning if being used outside of Streams, which we know includes "StreamThread-" in the client id +if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) { Review comment: @ijuma I came across this after the question was raised around the autodowngrade logic, apparently (according to the config's javadocs) it's an "internal" config that's only used for Streams. The config itself is package-private. Given that, I thought we may want to log a warning to any plain client users that saw this config and didn't notice that it was internal, and thus tried to use it. But I'm happy to do a followup PR to remove this. Alternatively, we can just take this config out -- I actually don't see any reason why it should be necessary, AFAICT it's just a slight convenience config that saves Streams from the ~5 lines of code it would take to do this downgrade itself (basically it just erases the extra consumer group metadata that isn't understood by older brokers). Not sure if this was vestigial from an older iteration of KIP-447, as it seems rather unnecessary.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630550102 ## File path: clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ## @@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() { } } +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldReturnEmptyCollection() { +List testData = Arrays.asList(); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get empty collection after serialization and deserialization on an empty list"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldReturnNull() { +List testData = null; +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get null after serialization and deserialization on an empty list"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripIntPrimitiveInput() { +List testData = Arrays.asList(1, 2, 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of integer primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() { +List testData = Arrays.asList(1, 2, 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); +assertEquals(21, listSerde.serializer().serialize(topic, testData).length, +"Should get length of 21 bytes after serialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripShortPrimitiveInput() { +List testData = Arrays.asList((short) 1, (short) 2, (short) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of short primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() { +List testData = Arrays.asList((short) 1, (short) 2, (short) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); +assertEquals(15, listSerde.serializer().serialize(topic, testData).length, +"Should get length of 15 bytes after serialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripFloatPrimitiveInput() { +List testData = Arrays.asList((float) 1, (float) 2, (float) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of float primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() { +List testData = Arrays.asList((float) 1, (float) 2, (float) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); +assertEquals(21, listSerde.serializer().serialize(topic, testData).length, +"Should get length of 21 bytes after serialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeShouldRoundtripLongPrimitiveInput() { +List testData = Arrays.asList((long) 1, (long) 2, (long) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Long()); +assertEquals(testData, +listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), +"Should get the original collection of long primitives after serialization and deserialization"); +} + +@SuppressWarnings("unchecked") +@Test +public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() { +List testData = Arrays.asList((long) 1, (long) 2, (long) 3); +Serde> listSerde = Serdes.ListSerde(ArrayList.class,
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630542548 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ## @@ -125,6 +126,27 @@ public UUIDSerde() { } } +static public final class ListSerde extends WrapperSerde> { Review comment: The problem is [all of them](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java) list `static public` first. ![image](https://user-images.githubusercontent.com/8620461/117885230-98d8d200-b27b-11eb-849e-afdc43631c89.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630542548 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ## @@ -125,6 +126,27 @@ public UUIDSerde() { } } +static public final class ListSerde extends WrapperSerde> { Review comment: The problem is all of them list `static public` first. ![image](https://user-images.githubusercontent.com/8620461/117885230-98d8d200-b27b-11eb-849e-afdc43631c89.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630539763 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer implements Serializer> { + +private static final List>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( +ShortSerializer.class, +IntegerSerializer.class, +FloatSerializer.class, +LongSerializer.class, +DoubleSerializer.class, +UUIDSerializer.class); + +private Serializer inner; +private SerializationStrategy serStrategy; +private boolean isFixedLength; + +public ListSerializer() {} + +public ListSerializer(Serializer inner) { +if (inner == null) { +throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); +} +this.inner = inner; +this.isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); +this.serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; +} + +public Serializer getInnerSerializer() { +return inner; +} + +@SuppressWarnings("unchecked") +@Override +public void configure(Map configs, boolean isKey) { +if (inner != null) { +throw new ConfigException("List serializer was already initialized using a non-default constructor"); +} +final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; +final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); +if (innerSerdeClassOrName == null) { +throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); +} +try { +if (innerSerdeClassOrName instanceof String) { +inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); +} else if (innerSerdeClassOrName instanceof Class) { +inner = (Serializer) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); +} else { +throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); +} +inner.configure(configs, isKey); +isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); +serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; +} catch (final ClassNotFoundException e) { +throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); +} +} + +private void serializeNullIndexList(final DataOutputStream out, List data) throws IOException { +List nullIndexList = IntStream.range(0, data.size()) +.filter(i -> data.get(i) == null) Review comment: Would something like this work? ``` int i = 0; List nullIndexList = new ArrayList<>(); for (Iterator it = data.listIterator(); it.hasNext(); i++) { if (it.next() == null) { nullIndexList.add(i); } } out.writeInt(nullIndexList.size()); for (int
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630539285 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer implements Serializer> { + +private static final List>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( +ShortSerializer.class, +IntegerSerializer.class, +FloatSerializer.class, +LongSerializer.class, +DoubleSerializer.class, +UUIDSerializer.class); + +private Serializer inner; +private SerializationStrategy serStrategy; +private boolean isFixedLength; + +public ListSerializer() {} + +public ListSerializer(Serializer inner) { +if (inner == null) { +throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); +} +this.inner = inner; +this.isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); +this.serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; +} + +public Serializer getInnerSerializer() { +return inner; +} + +@SuppressWarnings("unchecked") +@Override +public void configure(Map configs, boolean isKey) { +if (inner != null) { +throw new ConfigException("List serializer was already initialized using a non-default constructor"); +} +final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; +final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); +if (innerSerdeClassOrName == null) { +throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); +} +try { +if (innerSerdeClassOrName instanceof String) { +inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); +} else if (innerSerdeClassOrName instanceof Class) { +inner = (Serializer) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); +} else { +throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); +} +inner.configure(configs, isKey); +isFixedLength = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()); +serStrategy = this.isFixedLength ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; +} catch (final ClassNotFoundException e) { +throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); +} +} + +private void serializeNullIndexList(final DataOutputStream out, List data) throws IOException { +List nullIndexList = IntStream.range(0, data.size()) +.filter(i -> data.get(i) == null) Review comment: Would something like this work? ``` int i = 0; List nullIndexList = new ArrayList<>(); for (Iterator it = data.listIterator(); it.hasNext(); i++) { if (it.next() == null) { nullIndexList.add(i); } } out.writeInt(nullIndexList.size()); for (int
[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on a change in pull request #6592: URL: https://github.com/apache/kafka/pull/6592#discussion_r630523305 ## File path: clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +public class ListDeserializer implements Deserializer> { + +private static final Map>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( +mkEntry(ShortDeserializer.class, Short.BYTES), +mkEntry(IntegerDeserializer.class, Integer.BYTES), +mkEntry(FloatDeserializer.class, Float.BYTES), +mkEntry(LongDeserializer.class, Long.BYTES), +mkEntry(DoubleDeserializer.class, Double.BYTES), +mkEntry(UUIDDeserializer.class, 36) +); + +private Deserializer inner; +private Class listClass; +private Integer primitiveSize; + +public ListDeserializer() {} + +public > ListDeserializer(Class listClass, Deserializer inner) { +if (listClass == null || inner == null) { +throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); +} +this.listClass = listClass; +this.inner = inner; +this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); +} + +public Deserializer getInnerDeserializer() { +return inner; +} + +@Override +public void configure(Map configs, boolean isKey) { +if (listClass != null || inner != null) { +throw new ConfigException("List deserializer was already initialized using a non-default constructor"); +} +configureListClass(configs, isKey); +configureInnerSerde(configs, isKey); +} + +private void configureListClass(Map configs, boolean isKey) { +String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS; +final Object listClassOrName = configs.get(listTypePropertyName); +if (listClassOrName == null) { +throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config."); +} +try { +if (listClassOrName instanceof String) { +listClass = Utils.loadClass((String) listClassOrName, Object.class); +} else if (listClassOrName instanceof Class) { +listClass = (Class) listClassOrName; +} else { +throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property."); +} +} catch (final ClassNotFoundException e) { +throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found."); +} +} + +@SuppressWarnings("unchecked") +private void configureInnerSerde(Map configs, boolean isKey) { +String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; +final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); +if (innerSerdeClassOrName == null) { +throw new
[jira] [Assigned] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-12682: -- Assignee: Justine Olshan > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Assignee: Justine Olshan >Priority: Major > Labels: kip-500 > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only includes topic > name, when to process, it can't find the origin topic uuid, and find the > latest the topic id, but it's not right. and delete topic t1 should do before > create t1 or change p1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12766) Consider Disabling WAL-related Options in RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-12766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12766: --- Labels: newbie newbie++ (was: ) > Consider Disabling WAL-related Options in RocksDB > - > > Key: KAFKA-12766 > URL: https://issues.apache.org/jira/browse/KAFKA-12766 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bruno Cadonna >Priority: Minor > Labels: newbie, newbie++ > Fix For: 3.0.0 > > > Streams disables the write-ahead log (WAL) provided by RocksDB since it > replicates the data in changelog topics. Hence, it does not make much sense > to set WAL-related configs for RocksDB instances within Streams. > Streams could: > - disable WAL-related options > - ignore WAL-related options > - throw an exception when a WAL-related option is set. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Boojapho commented on pull request #10642: KAFKA-12756: Update Zookeeper to 3.6.3 or higher
Boojapho commented on pull request #10642: URL: https://github.com/apache/kafka/pull/10642#issuecomment-838928359 > Also, we override the netty version, so I am not sure the ZK version needs to be bumped at all. So long as only one verson of netty remains in the release, that should work instead of bumping ZK. I think 2.8 already uses a version that fixes the vulnerability. 2.7 and 2.6 might still need updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342778#comment-17342778 ] Ming Liu commented on KAFKA-12713: -- Here is PR [https://github.com/apache/kafka/pull/10674/files] for the KIP. Please comment. > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mingaliu opened a new pull request #10674: KAFKA-12713: Report the real fetch latency by removing the wait-time in purgatory.
mingaliu opened a new pull request #10674: URL: https://github.com/apache/kafka/pull/10674 This is to help monitor the 'real' fetch latency by removing the waitTime when FetchRequest is in purgatory. The changes include: 1. Add waitTimeMs in FetchResponse() 2. In Kafka API handler (in handleFetchRequest() function), when creating FetchResponse(), set the waitTimeMs as the time spent in purgatory 3. In Follower broker processFetchRequest(), it tracks the real latency of fetch requests by minus the waitTimeMs from FetchResponse. 4. In FetcherStats, we will add a new histogram to track this calculated "true" fetch latency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10626: KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0
dejan2609 commented on pull request #10626: URL: https://github.com/apache/kafka/pull/10626#issuecomment-838902447 Changing status to draft (until test CLI, as mentioned in a comments above), -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 edited a comment on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-838776681 > Looks like there is a change in behavior in Gradle 7 related to resource files that's causing a bunch of tests to fail Indeed... on my local machine I received around 1% of broken tests (and if I recall correctly they were all related to SSL/TLS, so I was hoping for a better test results here). Back to drawing board, I guess (I will execute tests again on my side with or without gradle shadow upgrade and then compare with test results here). Changing PR status to **_draft_** (again). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r630370013 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -242,85 +248,116 @@ final class KafkaMetadataLog private ( } override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { -try { - if (snapshotIds.contains(snapshotId)) { -Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) - } else { -Optional.empty() +snapshots synchronized { + val reader = snapshots.get(snapshotId) match { +case None => + // Snapshot doesn't exists + None +case Some(None) => + // Snapshot exists but has never been read before + try { +val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) +snapshots.put(snapshotId, snapshotReader) +snapshotReader + } catch { +case _: NoSuchFileException => + // Snapshot doesn't exists in the data dir; remove + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + warn(s"Couldn't read $snapshotId; expected to find snapshot file $path") + snapshots.remove(snapshotId) + None + } +case Some(value) => + // Snapshot exists and it is already open; do nothing + value } -} catch { - case _: NoSuchFileException => -Optional.empty() + + reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { -val descending = snapshotIds.descendingIterator -if (descending.hasNext) { - Optional.of(descending.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { -val ascendingIterator = snapshotIds.iterator -if (ascendingIterator.hasNext) { - Optional.of(ascendingIterator.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { -snapshotIds.add(snapshotId) +snapshots synchronized { + snapshots.put(snapshotId, None) +} } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) && -startOffset < logStartSnapshotId.offset && -logStartSnapshotId.offset <= snapshotId.offset && -log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => -log.deleteOldSegments() +val (deleted, forgottenSnapshots) = snapshots synchronized { + latestSnapshotId().asScala match { +case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && + startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + + // Delete all segments that have a "last offset" less than the log start offset + log.deleteOldSegments() -// Delete snapshot after increasing LogStartOffset -removeSnapshotFilesBefore(logStartSnapshotId) + // Forget snapshots less than the log start offset + (true, forgetSnapshotsBefore(logStartSnapshotId)) +case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + } +} -true +removeSnapshots(forgottenSnapshots) +deleted + } - case _ => false -} + /** + * Forget the snapshots earlier than a given snapshot id and return the associated + * snapshot readers. + * + * This method assumes that the lock for `snapshots` is ready held. Review comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r630369878 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -242,85 +248,116 @@ final class KafkaMetadataLog private ( } override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { -try { - if (snapshotIds.contains(snapshotId)) { -Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) - } else { -Optional.empty() +snapshots synchronized { + val reader = snapshots.get(snapshotId) match { +case None => + // Snapshot doesn't exists + None +case Some(None) => + // Snapshot exists but has never been read before + try { +val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) +snapshots.put(snapshotId, snapshotReader) +snapshotReader + } catch { +case _: NoSuchFileException => + // Snapshot doesn't exists in the data dir; remove + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + warn(s"Couldn't read $snapshotId; expected to find snapshot file $path") + snapshots.remove(snapshotId) + None + } +case Some(value) => + // Snapshot exists and it is already open; do nothing + value } -} catch { - case _: NoSuchFileException => -Optional.empty() + + reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { -val descending = snapshotIds.descendingIterator -if (descending.hasNext) { - Optional.of(descending.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { -val ascendingIterator = snapshotIds.iterator -if (ascendingIterator.hasNext) { - Optional.of(ascendingIterator.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { -snapshotIds.add(snapshotId) +snapshots synchronized { + snapshots.put(snapshotId, None) +} } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) && -startOffset < logStartSnapshotId.offset && -logStartSnapshotId.offset <= snapshotId.offset && -log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => -log.deleteOldSegments() +val (deleted, forgottenSnapshots) = snapshots synchronized { + latestSnapshotId().asScala match { +case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && + startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + + // Delete all segments that have a "last offset" less than the log start offset + log.deleteOldSegments() -// Delete snapshot after increasing LogStartOffset -removeSnapshotFilesBefore(logStartSnapshotId) + // Forget snapshots less than the log start offset + (true, forgetSnapshotsBefore(logStartSnapshotId)) +case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + } +} -true +removeSnapshots(forgottenSnapshots) +deleted + } - case _ => false -} + /** + * Forget the snapshots earlier than a given snapshot id and return the associated + * snapshot readers. + * + * This method assumes that the lock for `snapshots` is ready held. + */ + @nowarn("cat=deprecation") // Needed for TreeMap.until + private def forgetSnapshotsBefore( +logStartSnapshotId: OffsetAndEpoch + ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = { +val expiredSnapshots = snapshots.until(logStartSnapshotId).clone() +snapshots --= expiredSnapshots.keys + +expiredSnapshots } /** - * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset. + * Rename the given snapshots on the log directory. Asynchronously, close and delete the given + * snapshots. Review comment: Added your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r630369703 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -242,85 +248,116 @@ final class KafkaMetadataLog private ( } override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { -try { - if (snapshotIds.contains(snapshotId)) { -Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) - } else { -Optional.empty() +snapshots synchronized { + val reader = snapshots.get(snapshotId) match { +case None => + // Snapshot doesn't exists + None +case Some(None) => + // Snapshot exists but has never been read before + try { +val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) +snapshots.put(snapshotId, snapshotReader) +snapshotReader + } catch { +case _: NoSuchFileException => + // Snapshot doesn't exists in the data dir; remove + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + warn(s"Couldn't read $snapshotId; expected to find snapshot file $path") + snapshots.remove(snapshotId) + None + } +case Some(value) => + // Snapshot exists and it is already open; do nothing + value } -} catch { - case _: NoSuchFileException => -Optional.empty() + + reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { -val descending = snapshotIds.descendingIterator -if (descending.hasNext) { - Optional.of(descending.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { -val ascendingIterator = snapshotIds.iterator -if (ascendingIterator.hasNext) { - Optional.of(ascendingIterator.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { -snapshotIds.add(snapshotId) +snapshots synchronized { + snapshots.put(snapshotId, None) +} } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) && -startOffset < logStartSnapshotId.offset && -logStartSnapshotId.offset <= snapshotId.offset && -log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => -log.deleteOldSegments() +val (deleted, forgottenSnapshots) = snapshots synchronized { + latestSnapshotId().asScala match { +case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && + startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + + // Delete all segments that have a "last offset" less than the log start offset + log.deleteOldSegments() -// Delete snapshot after increasing LogStartOffset -removeSnapshotFilesBefore(logStartSnapshotId) + // Forget snapshots less than the log start offset + (true, forgetSnapshotsBefore(logStartSnapshotId)) +case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + } +} -true +removeSnapshots(forgottenSnapshots) +deleted + } - case _ => false -} + /** + * Forget the snapshots earlier than a given snapshot id and return the associated + * snapshot readers. + * + * This method assumes that the lock for `snapshots` is ready held. + */ + @nowarn("cat=deprecation") // Needed for TreeMap.until + private def forgetSnapshotsBefore( +logStartSnapshotId: OffsetAndEpoch + ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = { +val expiredSnapshots = snapshots.until(logStartSnapshotId).clone() +snapshots --= expiredSnapshots.keys + +expiredSnapshots } /** - * Removes all snapshots on the log directory whose epoch and end offset is less than the giving epoch and end offset. + * Rename the given snapshots on the log directory. Asynchronously, close and delete the given + * snapshots. */ - private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): Unit = { -val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, false).iterator -while
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r630367487 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -161,19 +162,24 @@ final class KafkaMetadataLog private ( override def truncateToLatestSnapshot(): Boolean = { val latestEpoch = log.latestEpoch.getOrElse(0) -latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotId.epoch > latestEpoch || -(snapshotId.epoch == latestEpoch && snapshotId.offset > endOffset().offset)) => +val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match { Review comment: Synchronizing `snapshots` is only needed when accessing that object. In `deleteBeforeSnapshot` it is grabbed because the `match` expression accesses `snapshots` in one of the `case`/branch. In this method I think it is safe to only grab the log where we currently do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342739#comment-17342739 ] Moses Nakamura commented on KAFKA-3539: --- We've been wrestling with this problem for a while. I think my company would like to start the process to try to patch it. However, it's definitely thorny, so we had some issues we wanted to talk about! The main questions seem to be: # Which thread should wait for the partition metadata? # How can we preserve the ordering semantics that send gives us today? I think the ideal answer to (1) would be, "no thread" and that we should do it asynchronously. This would be a pretty invasive change though, so I think it's probably more expedient to pass in a threadpool and do it there. How would you feel about optionally passing in a threadpool to KafkaProducer#send, or else on construction to KafkaProducer? For (2), it seems like we have two plausible approaches, either pass async data to the RecordAccumulator, or else layer another queue on top. I think layering another queue on top would be less invasive, so I would lean toward that approach. We could have a central queue that orders all of the work, conditioned on the metadata that we're waiting for being ready. Does that sound good to you? Should I put up a patch so we can talk about it? > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r630364576 ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java ## @@ -54,8 +54,12 @@ public Records records() { } @Override -public void close() throws IOException { -fileRecords.close(); +public void close() { Review comment: I created https://issues.apache.org/jira/browse/KAFKA-12773 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12773) Use UncheckedIOException when wrapping IOException
Jose Armando Garcia Sancio created KAFKA-12773: -- Summary: Use UncheckedIOException when wrapping IOException Key: KAFKA-12773 URL: https://issues.apache.org/jira/browse/KAFKA-12773 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio Use UncheckedIOException when wrapping IOException instead of RuntimeException. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12758) Create a new `server-common` module and move ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to that module.
[ https://issues.apache.org/jira/browse/KAFKA-12758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-12758. - Fix Version/s: 3.0.0 Resolution: Fixed merged the PR to trunk > Create a new `server-common` module and move ApiMessageAndVersion, > RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to that module. > - > > Key: KAFKA-12758 > URL: https://issues.apache.org/jira/browse/KAFKA-12758 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.0.0 > > > Create a new `server-common` module and move ApiMessageAndVersion, > RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to that module. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao merged pull request #10638: KAFKA-12758 Added `server-common` module to have server side common classes.
junrao merged pull request #10638: URL: https://github.com/apache/kafka/pull/10638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r628550053 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws } /** - * Delete the snapshot from the filesystem, the caller may firstly rename snapshot file to - * ${file}.deleted, so we try to delete the file as well as the renamed file if exists. + * Delete the snapshot from the filesystem. */ -public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch snapshotId) { -Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId); -Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId); +public static boolean deleteIfExists(Path logDir, OffsetAndEpoch snapshotId) { +Path immutablePath = snapshotPath(logDir, snapshotId); +Path deletedPath = deleteRename(immutablePath, snapshotId); try { -return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletingPath); +return Files.deleteIfExists(immutablePath) | Files.deleteIfExists(deletedPath); } catch (IOException e) { -log.error("Error deleting snapshot file " + deletingPath, e); +log.error("Error deleting snapshot files {} and {}", immutablePath, deletedPath, e); return false; } } +/** + * Mark a snapshot for deletion by renaming with the deleted suffix + */ +public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) { +Path immutablePath = snapshotPath(logDir, snapshotId); +Path deletedPath = deleteRename(immutablePath, snapshotId); +try { +Files.move(immutablePath, deletedPath, StandardCopyOption.ATOMIC_MOVE); Review comment: I filed an issue regarding `atomicMoveWithFallback` and its used on `FileRecords`. `FileRecords` is used by snapshots and log segments. For now, I'll revert to `atomicMoveWithFallback` and address this issue in that Jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -242,85 +246,116 @@ final class KafkaMetadataLog private ( } override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { -try { - if (snapshotIds.contains(snapshotId)) { -Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) - } else { -Optional.empty() +snapshots synchronized { + val reader = snapshots.get(snapshotId) match { +case None => + // Snapshot doesn't exists + None +case Some(None) => + // Snapshot exists but has never been read before + try { +val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) +snapshots.put(snapshotId, snapshotReader) +snapshotReader + } catch { +case _: NoSuchFileException => + // Snapshot doesn't exists in the data dir; remove + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + warn(s"Couldn't read $snapshotId; expected to find snapshot file $path") + snapshots.remove(snapshotId) + None + } +case Some(value) => + // Snapshot exists and it is already open; do nothing + value } -} catch { - case _: NoSuchFileException => -Optional.empty() + + reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { -val descending = snapshotIds.descendingIterator -if (descending.hasNext) { - Optional.of(descending.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { -val ascendingIterator = snapshotIds.iterator -if (ascendingIterator.hasNext) { - Optional.of(ascendingIterator.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { -snapshotIds.add(snapshotId) +snapshots synchronized { + snapshots.put(snapshotId, None) +} } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) && -startOffset < logStartSnapshotId.offset && -logStartSnapshotId.offset <= snapshotId.offset && -log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => -log.deleteOldSegments() +val (deleted, forgottenSnapshots) = snapshots synchronized { + latestSnapshotId().asScala match { +case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && + startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + + // Delete all segments that have a "last offset" less than the log start offset + log.deleteOldSegments() -// Delete snapshot after increasing LogStartOffset -removeSnapshotFilesBefore(logStartSnapshotId) + // Forget snapshots less than the log start offset + (true, forgetSnapshotsBefore(logStartSnapshotId)) +case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + } +} -true +removeSnapshots(forgottenSnapshots) +deleted + } - case _ => false -} + /** + * Forget the snapshots earlier than a given snapshot id and return the associated + * snapshot readers. + * + * This method assumes that the lock for `snapshots` is ready held. + */ + @nowarn("cat=deprecation") // Needed for TreeMap.until Review comment: The issue is that Kafka needs to compile against both Scala 2.12 and 2.13. In Scala 2.13 a lot of the collection methods were deprecated. The community created [scala-collection-compat](https://github.com/scala/scala-collection-compat) to allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that project. Unfortunately, there is a pretty annoying bug in the latest stable version of `scala-collection-compat` that generates "unused import warning" when used in 2.13. The Kafka project turns those warnings into errors and the Scala compiler doesn't allow the use of `nowarn` in imports. The best solution I can find is to use 2.12 methods that are deprecated and add this nowarn flag. It looks
[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model
jsancio commented on a change in pull request #10431: URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -242,85 +246,116 @@ final class KafkaMetadataLog private ( } override def readSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotReader] = { -try { - if (snapshotIds.contains(snapshotId)) { -Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) - } else { -Optional.empty() +snapshots synchronized { + val reader = snapshots.get(snapshotId) match { +case None => + // Snapshot doesn't exists + None +case Some(None) => + // Snapshot exists but has never been read before + try { +val snapshotReader = Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId)) +snapshots.put(snapshotId, snapshotReader) +snapshotReader + } catch { +case _: NoSuchFileException => + // Snapshot doesn't exists in the data dir; remove + val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) + warn(s"Couldn't read $snapshotId; expected to find snapshot file $path") + snapshots.remove(snapshotId) + None + } +case Some(value) => + // Snapshot exists and it is already open; do nothing + value } -} catch { - case _: NoSuchFileException => -Optional.empty() + + reader.asJava.asInstanceOf[Optional[RawSnapshotReader]] } } override def latestSnapshotId(): Optional[OffsetAndEpoch] = { -val descending = snapshotIds.descendingIterator -if (descending.hasNext) { - Optional.of(descending.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def earliestSnapshotId(): Optional[OffsetAndEpoch] = { -val ascendingIterator = snapshotIds.iterator -if (ascendingIterator.hasNext) { - Optional.of(ascendingIterator.next) -} else { - Optional.empty() +snapshots synchronized { + snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava } } override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = { -snapshotIds.add(snapshotId) +snapshots synchronized { + snapshots.put(snapshotId, None) +} } override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { -latestSnapshotId().asScala match { - case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) && -startOffset < logStartSnapshotId.offset && -logStartSnapshotId.offset <= snapshotId.offset && -log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => -log.deleteOldSegments() +val (deleted, forgottenSnapshots) = snapshots synchronized { + latestSnapshotId().asScala match { +case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) && + startOffset < logStartSnapshotId.offset && + logStartSnapshotId.offset <= snapshotId.offset && + log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, SnapshotGenerated)) => + + // Delete all segments that have a "last offset" less than the log start offset + log.deleteOldSegments() -// Delete snapshot after increasing LogStartOffset -removeSnapshotFilesBefore(logStartSnapshotId) + // Forget snapshots less than the log start offset + (true, forgetSnapshotsBefore(logStartSnapshotId)) +case _ => + (false, mutable.TreeMap.empty[OffsetAndEpoch, Option[FileRawSnapshotReader]]) + } +} -true +removeSnapshots(forgottenSnapshots) +deleted + } - case _ => false -} + /** + * Forget the snapshots earlier than a given snapshot id and return the associated + * snapshot readers. + * + * This method assumes that the lock for `snapshots` is ready held. + */ + @nowarn("cat=deprecation") // Needed for TreeMap.until Review comment: The issue is that Kafka needs to compile against both Scala 2.12 and 2.13. In Scala 2.13 a lot of the collection methods were deprecated. The community created [scala-collection-compat](https://github.com/scala/scala-collection-compat) to allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that project. Unfortunately, there is a pretty annoying bug in the latest stable version of `scala-collection-compat` that generates "unused import warning" when used in 2.13. The Kafka project turns those warnings into error and the Scala compiler doesn't allow the use of `nowarn` in imports. The best solution I can find is to use 2.12 methods that are deprecated and add this nowarn flag. It looks like
[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)
dejan2609 commented on pull request #10606: URL: https://github.com/apache/kafka/pull/10606#issuecomment-838776681 > Looks like there is a change in behavior in Gradle 7 related to resource files that's causing a bunch of tests to fail Indeed... on my local machine I received around 1% of broken tests (and if I recall correctly they were all related to SSL/TLS, so I was hoping for a better test results here). Back to drawing board, I guess (I will execute tests again on my side with or without gradle shadow upgrade and/or compare with test results here). Changing PR status to **_draft_** (again). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10647: MINOR Removed copying storage libraries specifically as the task already copies them.
ijuma commented on a change in pull request #10647: URL: https://github.com/apache/kafka/pull/10647#discussion_r630330236 ## File path: build.gradle ## @@ -1003,10 +1003,6 @@ project(':core') { from(project(':connect:mirror').configurations.runtimeClasspath) { into("libs/") } from(project(':connect:mirror-client').jar) { into("libs/") } from(project(':connect:mirror-client').configurations.runtimeClasspath) { into("libs/") } -from(project(':storage').jar) { into("libs/") } -from(project(':storage').configurations.runtimeClasspath) { into("libs/") } -from(project(':storage:api').jar) { into("libs/") } -from(project(':storage:api').configurations.runtimeClasspath) { into("libs/") } Review comment: Makes sense, we only need to add modules that are not a core dependency already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #10647: MINOR Removed copying storage libraries specifically as the task already copies them.
junrao merged pull request #10647: URL: https://github.com/apache/kafka/pull/10647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10638: KAFKA-12758 Added `server-common` module to have server side common classes.
mumrah commented on a change in pull request #10638: URL: https://github.com/apache/kafka/pull/10638#discussion_r630302538 ## File path: build.gradle ## @@ -1345,6 +1349,62 @@ project(':raft') { } } +project(':server-common') { + archivesBaseName = "kafka-server-common" + + dependencies { +api project(':clients') +implementation libs.slf4jApi + +testImplementation project(':clients') +testImplementation project(':clients').sourceSets.test.output +testImplementation libs.junitJupiter +testImplementation libs.mockitoCore + +testRuntimeOnly libs.slf4jlog4j + } + + task createVersionFile(dependsOn: determineCommitId) { +ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName") +outputs.file receiptFile +outputs.upToDateWhen { false } +doLast { + def data = [ + commitId: commitId, + version: version, + ] + + receiptFile.parentFile.mkdirs() + def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n") + receiptFile.setText(content, "ISO-8859-1") +} + } + + sourceSets { +main { + java { Review comment: Are we only going to allow Java sources in this new module? (I think that's probably a good idea) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10638: KAFKA-12758 Added `server-common` module to have server side common classes.
mumrah commented on a change in pull request #10638: URL: https://github.com/apache/kafka/pull/10638#discussion_r630302538 ## File path: build.gradle ## @@ -1345,6 +1349,62 @@ project(':raft') { } } +project(':server-common') { + archivesBaseName = "kafka-server-common" + + dependencies { +api project(':clients') +implementation libs.slf4jApi + +testImplementation project(':clients') +testImplementation project(':clients').sourceSets.test.output +testImplementation libs.junitJupiter +testImplementation libs.mockitoCore + +testRuntimeOnly libs.slf4jlog4j + } + + task createVersionFile(dependsOn: determineCommitId) { +ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName") +outputs.file receiptFile +outputs.upToDateWhen { false } +doLast { + def data = [ + commitId: commitId, + version: version, + ] + + receiptFile.parentFile.mkdirs() + def content = data.entrySet().collect { "$it.key=$it.value" }.sort().join("\n") + receiptFile.setText(content, "ISO-8859-1") +} + } + + sourceSets { +main { + java { Review comment: Are we only going to allow Java sources in this new module? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
jlprat commented on pull request #10669: URL: https://github.com/apache/kafka/pull/10669#issuecomment-838708152 Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
jlprat commented on pull request #10672: URL: https://github.com/apache/kafka/pull/10672#issuecomment-838708838 Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat closed pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
jlprat closed pull request #10672: URL: https://github.com/apache/kafka/pull/10672 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat closed pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
jlprat closed pull request #10669: URL: https://github.com/apache/kafka/pull/10669 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-8562) SASL_SSL still performs reverse DNS lookup despite KAFKA-5051
[ https://issues.apache.org/jira/browse/KAFKA-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-8562: - Fix Version/s: 2.8.1 2.7.2 > SASL_SSL still performs reverse DNS lookup despite KAFKA-5051 > - > > Key: KAFKA-8562 > URL: https://issues.apache.org/jira/browse/KAFKA-8562 > Project: Kafka > Issue Type: Bug >Reporter: Badai Aqrandista >Assignee: Davor Poldrugo >Priority: Minor > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > When using SASL_SSL, the Kafka client performs a reverse DNS lookup to > resolve IP to DNS. So, this circumvent the security fix made in KAFKA-5051. > This is the line of code from AK 2.2 where it performs the lookup: > https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java#L205 > Following log messages show that consumer initially tried to connect with IP > address 10.0.2.15. Then suddenly it created SaslClient with a hostname: > {code:java} > [2019-06-18 06:23:36,486] INFO Kafka commitId: 00d486623990ed9d > (org.apache.kafka.common.utils.AppInfoParser) > [2019-06-18 06:23:36,487] DEBUG [Consumer > clientId=KafkaStore-reader-_schemas, groupId=schema-registry-10.0.2.15-18081] > Kafka consumer initialized (org.apache.kafka.clients.consumer.KafkaConsumer) > [2019-06-18 06:23:36,505] DEBUG [Consumer > clientId=KafkaStore-reader-_schemas, groupId=schema-registry-10.0.2.15-18081] > Initiating connection to node 10.0.2.15:19094 (id: -1 rack: null) using > address /10.0.2.15 (org.apache.kafka.clients.NetworkClient) > [2019-06-18 06:23:36,512] DEBUG Set SASL client state to > SEND_APIVERSIONS_REQUEST > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > [2019-06-18 06:23:36,515] DEBUG Creating SaslClient: > client=null;service=kafka;serviceHostname=quickstart.confluent.io;mechs=[PLAIN] > (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator) > {code} > Thanks > Badai -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
omkreddy commented on pull request #10672: URL: https://github.com/apache/kafka/pull/10672#issuecomment-838657103 @jlprat Thanks for PR. I will directly push the original commit to 2.8 and 2.7 branches. We can close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
omkreddy commented on pull request #10669: URL: https://github.com/apache/kafka/pull/10669#issuecomment-838656757 @jlprat Thanks for PR. I will directly push the original commit to 2.8 and 2.7 branches. We can close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
chia7712 commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630238771 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() { @Test public void testCloseIsIdempotent() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.close(); -final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.close(); +final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); -streams.close(); -Assert.assertEquals("subsequent close() calls should do nothing", -closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +streams.close(); +Assert.assertEquals("subsequent close() calls should do nothing", +closeCount, MockMetricsReporter.CLOSE_COUNT.get()); +} } @Test public void shouldAddThreadWhenRunning() throws InterruptedException { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -final int oldSize = streams.threads.size(); -TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); -assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); -assertThat(streams.threads.size(), equalTo(oldSize + 1)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +streams.start(); +final int oldSize = streams.threads.size(); +TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); +assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); +assertThat(streams.threads.size(), equalTo(oldSize + 1)); +} } @Test public void shouldNotAddThreadWhenCreated() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenClosed() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.close(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.close(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotAddThreadWhenError() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -final int oldSize = streams.threads.size(); -streams.start(); -globalStreamThread.shutdown(); -assertThat(streams.addStreamThread(), equalTo(Optional.empty())); -assertThat(streams.threads.size(), equalTo(oldSize)); +try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { +final int oldSize = streams.threads.size(); +streams.start(); +globalStreamThread.shutdown(); +assertThat(streams.addStreamThread(), equalTo(Optional.empty())); +assertThat(streams.threads.size(), equalTo(oldSize)); +} } @Test public void shouldNotReturnDeadThreads() { -final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); -streams.start(); -streamThreadOne.shutdown(); -final Set threads = streams.localThreadsMetadata(); +final Set threads; +try (final KafkaStreams
[GitHub] [kafka] chia7712 opened a new pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…
chia7712 opened a new pull request #10673: URL: https://github.com/apache/kafka/pull/10673 related to #10532 the default value of `replication.factor` was changed from `1` to `-1`. The old broker (< 2.4) does not support such configuration so this PR sets the `replication.factor` to `1` to fix `streams_broker_compatibility_test.py`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest
vitojeng commented on pull request #10668: URL: https://github.com/apache/kafka/pull/10668#issuecomment-838538304 @mjsax , @ableegoldman Please take a look. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9826) Log cleaning repeatedly picks same segment with no effect when first dirty offset is past start of active segment
[ https://issues.apache.org/jira/browse/KAFKA-9826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342571#comment-17342571 ] zhangzhisheng commented on KAFKA-9826: -- we have same problems,kafka version 2.12_2.4.1 {code:java} // code placeholder [2021-05-11 18:31:33,329] WARN Resetting first dirty offset of __consumer_offsets-30 to log start offset 4187979634 since the checkpointed offset 4187569609 is invalid. (kafka.log.LogCleanerManager$) {code} > Log cleaning repeatedly picks same segment with no effect when first dirty > offset is past start of active segment > - > > Key: KAFKA-9826 > URL: https://issues.apache.org/jira/browse/KAFKA-9826 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.4.1 >Reporter: Steve Rodrigues >Assignee: Steve Rodrigues >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > Seen on a system where a given partition had a single segment, and for > whatever reason (deleteRecords?), the logStartOffset was greater than the > base segment of the log, there were a continuous series of > ``` > [2020-03-03 16:56:31,374] WARN Resetting first dirty offset of FOO-3 to log > start offset 55649 since the checkpointed offset 0 is invalid. > (kafka.log.LogCleanerManager$) > ``` > messages (partition name changed, it wasn't really FOO). This was expected to > be resolved by KAFKA-6266 but clearly wasn't. > Further investigation revealed that a few segments were continuously > cleaning and generating messages in the `log-cleaner.log` of the form: > ``` > [2020-03-31 13:34:50,924] INFO Cleaner 1: Beginning cleaning of log FOO-3 > (kafka.log.LogCleaner) > [2020-03-31 13:34:50,924] INFO Cleaner 1: Building offset map for FOO-3... > (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO Cleaner 1: Building offset map for log FOO-3 > for 0 segments in offset range [55287, 54237). (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO Cleaner 1: Offset map for log FOO-3 complete. > (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO Cleaner 1: Cleaning log FOO-3 (cleaning prior > to Wed Dec 31 19:00:00 EST 1969, discarding tombstones prior to Tue Dec 10 > 13:39:08 EST 2019)... (kafka.log.LogCleaner) > [2020-03-31 13:34:50,927] INFO [kafka-log-cleaner-thread-1]: Log cleaner > thread 1 cleaned log FOO-3 (dirty section = [55287, 55287]) > 0.0 MB of log processed in 0.0 seconds (0.0 MB/sec). > Indexed 0.0 MB in 0.0 seconds (0.0 Mb/sec, 100.0% of total time) > Buffer utilization: 0.0% > Cleaned 0.0 MB in 0.0 seconds (NaN Mb/sec, 0.0% of total time) > Start size: 0.0 MB (0 messages) > End size: 0.0 MB (0 messages) NaN% size reduction (NaN% fewer messages) > (kafka.log.LogCleaner) > ``` > What seems to have happened here (data determined for a different partition) > is: > There exist a number of partitions here which get relatively low traffic, > including our friend FOO-5. For whatever reason, LogStartOffset of this > partition has moved beyond the baseOffset of the active segment. (Notes in > other issues indicate that this is a reasonable scenario.) So there’s one > segment, starting at 166266, and a log start of 166301. > grabFilthiestCompactedLog runs and reads the checkpoint file. We see that > this topicpartition needs to be cleaned, and call cleanableOffsets on it > which returns an OffsetsToClean with firstDirtyOffset == logStartOffset == > 166301 and firstUncleanableOffset = max(logStart, activeSegment.baseOffset) = > 116301, and forceCheckpoint = true. > The checkpoint file is updated in grabFilthiestCompactedLog (this is the fix > for KAFKA-6266). We then create a LogToClean object based on the > firstDirtyOffset and firstUncleanableOffset of 166301 (past the active > segment’s base offset). > The LogToClean object has cleanBytes = logSegments(-1, > firstDirtyOffset).map(_.size).sum → the size of this segment. It has > firstUncleanableOffset and cleanableBytes determined by > calculateCleanableBytes. calculateCleanableBytes returns: > {{}} > {{val firstUncleanableSegment = > log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)}} > {{val firstUncleanableOffset = firstUncleanableSegment.baseOffset}} > {{val cleanableBytes = log.logSegments(firstDirtyOffset, > math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum > (firstUncleanableOffset, cleanableBytes)}} > firstUncleanableSegment is activeSegment. firstUncleanableOffset is the base > offset, 166266. cleanableBytes is looking for logSegments(166301, max(166301, > 166266) → which _is the active segment_ > So there are “cleanableBytes” > 0. > We then filter out segments with totalbytes (clean + cleanable) > 0. This >
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-838339697 I saw there are cooperative sticky tests failed. I'll update it, and add more tests into it tomorrow or later. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10667: KAFKA-12772: Move all transaction state transition rules into their states
dengziming commented on pull request #10667: URL: https://github.com/apache/kafka/pull/10667#issuecomment-838294186 Hello @guozhangwang ,PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
jlprat commented on pull request #10672: URL: https://github.com/apache/kafka/pull/10672#issuecomment-838221652 Test failure was: `Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()` This flaky test is already fixed (https://issues.apache.org/jira/browse/KAFKA-12384) but only on `trunk` not on the `2.7` branch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT commented on pull request #10670: KAFKA-10273 Connect Converters should produce actionable error messages
DuongPTIT commented on pull request #10670: URL: https://github.com/apache/kafka/pull/10670#issuecomment-838210303 retest this please -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…
jlprat commented on pull request #10669: URL: https://github.com/apache/kafka/pull/10669#issuecomment-838186948 Test failure was: `Build / JDK 8 and Scala 2.12 / kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()` This flaky test is already fixed (https://issues.apache.org/jira/browse/KAFKA-12384) but only on `trunk` not on the `2.8` branch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon edited a comment on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-838136488 @ableegoldman , I've done the code refinement and refactor. Basically is what we've discussed in constrained Assignor PR. Please take a look when available. cc @guozhangwang , welcome to take another look. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-838136488 @ableegoldman , I've done the code refinement and refactor. Basically is what we've discussed in constrained Assignor PR. Please take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r629994499 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); -balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionsCount); + +if (log.isDebugEnabled()) { +log.debug("final assignment: {}", currentAssignment); +} + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedAllPartitions, + List sortedAssignedPartitions, + Map> topic2AllPotentialConsumers) { +if (sortedAssignedPartitions.isEmpty()) { +return sortedAllPartitions; +} + +List unassignedPartitions = new ArrayList<>(); + +Collections.sort(sortedAssignedPartitions, new PartitionComparator(topic2AllPotentialConsumers)); + +boolean shouldAddDirectly = false; +Iterator sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator(); +TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next(); + +for (TopicPartition topicPartition : sortedAllPartitions) { +if (shouldAddDirectly || !nextAssignedPartition.equals(topicPartition)) { +unassignedPartitions.add(topicPartition); +} else { +// this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition +if (sortedAssignedPartitionsIter.hasNext()) { +nextAssignedPartition = sortedAssignedPartitionsIter.next(); +} else { +// add the remaining directly since there is no more sortedAssignedPartitions +shouldAddDirectly = true; +} +} +} +return unassignedPartitions; +} + +/** + * get the unassigned partition list by computing the difference set of all sorted partitions + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in constrainedAssign method + * + * To compute the difference set, we use two pointers technique here: + * + * We loop through the all sorted topics, and then iterate all partitions the topic has, + * compared with the ith element in sortedAssignedPartitions(i starts from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param totalPartitionsCount all partitions counts in this assignment + * @param partitionsPerTopicthe number of partitions for each subscribed topic. + * @param sortedAssignedPartitions sorted partitions, all are included in the sortedPartitions + * @return the partitions not yet assigned to any consumers + */ +private List getUnassignedPartitions(int totalPartitionsCount, + Map partitionsPerTopic, +
[GitHub] [kafka] jlprat edited a comment on pull request #10651: MINOR: Kafka Streams code samples formating unification
jlprat edited a comment on pull request #10651: URL: https://github.com/apache/kafka/pull/10651#issuecomment-838123810 Most of the changes are purely cosmetic: - using the right tags for for embedding a code snippet - escaping `<` and `>` characters to HTML encoded strings so they are properly rendered - removing extra indentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r629991853 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); -balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionsCount); + +if (log.isDebugEnabled()) { +log.debug("final assignment: {}", currentAssignment); +} + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedAllPartitions, Review comment: put the 2 `getUnassignedPartitions` (this one and the following one) overloading method together for readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r629991853 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); -balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionsCount); + +if (log.isDebugEnabled()) { +log.debug("final assignment: {}", currentAssignment); +} + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedAllPartitions, Review comment: put the 2 `getUnassignedPartitions` overloading method together for readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r629990802 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -428,14 +372,18 @@ private boolean allSubscriptionsEqual(Set allTopics, for (TopicPartition topicPartition: entry.getValue()) currentPartitionConsumer.put(topicPartition, entry.getKey()); -List sortedPartitions = sortPartitions(partition2AllPotentialConsumers); +int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); +List sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet()); +Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers)); +List sortedAllPartitions = getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount); Review comment: reuse the `getAllTopicPartitions` in `constrainedAssign` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r629990096 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -106,9 +104,9 @@ private boolean allSubscriptionsEqual(Set allTopics, // initialize the subscribed topics set if this is the first subscription if (subscribedTopics.isEmpty()) { subscribedTopics.addAll(subscription.topics()); -} else if (!(subscription.topics().size() == subscribedTopics.size() +} else if (isAllSubscriptionsEqual && !(subscription.topics().size() == subscribedTopics.size() && subscribedTopics.containsAll(subscription.topics( { -return false; +isAllSubscriptionsEqual = false; Review comment: Now, we'll run through all the `subscriptions` since the data `consumerToOwnedPartitions` will also passed into `generalAssign` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification
jlprat commented on pull request #10651: URL: https://github.com/apache/kafka/pull/10651#issuecomment-838123810 Most of the changes are purely cosmetic: - using the right tags for for embedding a code snippet - escaping `<` and `>` characters to HTML encoded strings so they are properly rendered -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification
jlprat commented on pull request #10651: URL: https://github.com/apache/kafka/pull/10651#issuecomment-838120807 @cadonna I'll do it next time. I was doubting between providing a PR per file or a PR per folder. (ended up doing PR for the Streams folder). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org