[GitHub] [kafka] dongjinleekr commented on pull request #10642: KAFKA-12756: Update Zookeeper to 3.6.3 or higher

2021-05-11 Thread GitBox


dongjinleekr commented on pull request #10642:
URL: https://github.com/apache/kafka/pull/10642#issuecomment-839446528


   @Boojapho Totally agree. Let's keep an eye on the other projects' updates. 
It would not be late until Zookeeper and other related projects drop the 
support for the security vulnerability.
   
   +1. I am now testing the Kafka cluster with Zookeeper 3.6.x. Everything is 
working fine until now, except classpath conflict in Kafka itself. I am now 
working on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early

2021-05-11 Thread Juan C. Gonzalez-Zurita (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17343001#comment-17343001
 ] 

Juan C. Gonzalez-Zurita commented on KAFKA-12718:
-

Thank you for assigning me. I will begin work on it tomorrow morning. :D

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Juan C. Gonzalez-Zurita
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman removed a comment on pull request #10676: MINOR: rename TopologyDescription.Subtopology to SubtopologyDescription

2021-05-11 Thread GitBox


ableegoldman removed a comment on pull request #10676:
URL: https://github.com/apache/kafka/pull/10676#issuecomment-839409037


   @wcarlson5 @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630705257



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception 
{
 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
 "Thread never stopped."
 );
-} finally {
 streams.close();

Review comment:
   @mjsax The line 532(original) wait for the streams state equal to 
**ERROR** after streams closed. 
   Although this test case will pass if we remove `close()`, but it seems we 
might be better to retain `close()`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10676: MINOR: rename TopologyDescription.Subtopology to SubtopologyDescription

2021-05-11 Thread GitBox


ableegoldman commented on pull request #10676:
URL: https://github.com/apache/kafka/pull/10676#issuecomment-839409037


   @wcarlson5 @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10676: MINOR: rename TopologyDescription.Subtopology to SubtopologyDescription

2021-05-11 Thread GitBox


ableegoldman opened a new pull request #10676:
URL: https://github.com/apache/kafka/pull/10676


   Literally just renaming a class because I want to use the name `Subtopology` 
for something else, and the current `Subtopology` class seems to be used solely 
for the topology description. Split into a separate PR to minimize no-op 
changes in the main PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630701286



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
 () -> streams.localThreadsMetadata().stream().allMatch(t -> 
t.threadState().equals("DEAD")),
 "Streams never stopped"
 );
-} finally {
 streams.close();

Review comment:
   @mjsax We should not remove this line, otherwise the line 498(original) 
will throw a timeout exception.
   The streams state will be always **RUNNING** if we skip `close()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12772) Move all TransactionState transition rules into their states

2021-05-11 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-12772.

Resolution: Fixed

> Move all TransactionState transition rules into their states
> 
>
> Key: KAFKA-12772
> URL: https://issues.apache.org/jira/browse/KAFKA-12772
> Project: Kafka
>  Issue Type: Improvement
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10667: KAFKA-12772: Move all transaction state transition rules into their states

2021-05-11 Thread GitBox


guozhangwang commented on pull request #10667:
URL: https://github.com/apache/kafka/pull/10667#issuecomment-839359339


   LGTM. Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10667: KAFKA-12772: Move all transaction state transition rules into their states

2021-05-11 Thread GitBox


guozhangwang merged pull request #10667:
URL: https://github.com/apache/kafka/pull/10667


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r630628119



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
##
@@ -417,6 +417,7 @@ public void 
shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna
 assertEquals(Collections.singletonList(expectedResult), 
producer.consumerGroupOffsetsHistory());
 }
 
+@SuppressWarnings("deprecation")
 @Test

Review comment:
   Well, it does call deprecated code. But I didn't think to just deprecate 
the test itself. That makes more sense, will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ijuma commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r630626735



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
##
@@ -417,6 +417,7 @@ public void 
shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna
 assertEquals(Collections.singletonList(expectedResult), 
producer.consumerGroupOffsetsHistory());
 }
 
+@SuppressWarnings("deprecation")
 @Test

Review comment:
   The suppression is only needed for cases where non deprecated code has 
to call deprecated code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ijuma commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r630626586



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
##
@@ -417,6 +417,7 @@ public void 
shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna
 assertEquals(Collections.singletonList(expectedResult), 
producer.consumerGroupOffsetsHistory());
 }
 
+@SuppressWarnings("deprecation")
 @Test

Review comment:
   If their main purpose is to test deprecated functionality, we should 
deprecate the test methods too so we can remove them when the deprecated non 
test code is removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10634: KAFKA-12754: Improve endOffsets for TaskMetadata

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#discussion_r630615608



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -1193,6 +1196,11 @@ public void updateCommittedOffsets(final TopicPartition 
topicPartition, final Lo
 committedOffsets.put(topicPartition, offset);
 }
 
+@Override
+public void updateEndOffsets(final TopicPartition topicPartition, final 
Long offset) {
+highWatermark.put(topicPartition, offset);

Review comment:
   Oh I see, I thought we were making a copy of the end offsets in 
StreamTask#highWatermark, but it's just an unmodifiableMap




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r630614141



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
##
@@ -417,6 +417,7 @@ public void 
shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna
 assertEquals(Collections.singletonList(expectedResult), 
producer.consumerGroupOffsetsHistory());
 }
 
+@SuppressWarnings("deprecation")
 @Test

Review comment:
   These are testing the deprecated method specifically. There is an 
identical test below for the non-deprecated version of this API. Are you 
suggesting we should just go ahead and remove the tests for the deprecated 
functionality?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ijuma commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r630612361



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
##
@@ -417,6 +417,7 @@ public void 
shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna
 assertEquals(Collections.singletonList(expectedResult), 
producer.consumerGroupOffsetsHistory());
 }
 
+@SuppressWarnings("deprecation")
 @Test

Review comment:
   Are these tests only present for testing deprecated functionality or are 
they also testing non deprecated functionality?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r630612173



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   Ah, yeah, I bet that's it: we only set it for the EOS tests. Might be 
better to just set it in the Java code instead as it's easier to find and read, 
and I believe most other configs are set there. I think this test runs the 
`StreamsSmokeTest`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ijuma commented on a change in pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#discussion_r630612053



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
##
@@ -88,26 +84,13 @@ public Builder(final String transactionalId,
 .setMemberId(memberId)
 .setGenerationId(generationId)
 .setGroupInstanceId(groupInstanceId.orElse(null));
-this.autoDowngrade = autoDowngrade;
 }
 
 @Override
 public TxnOffsetCommitRequest build(short version) {
 if (version < 3 && groupMetadataSet()) {
-if (autoDowngrade) {
-log.trace("Downgrade the request by resetting group 
metadata fields: " +
-  "[member.id:{}, generation.id:{}, 
group.instance.id:{}], because broker " +
-  "only supports TxnOffsetCommit version {}. 
Need " +
-  "v3 or newer to enable this feature",
-data.memberId(), data.generationId(), 
data.groupInstanceId(), version);
-
-
data.setGenerationId(JoinGroupRequest.UNKNOWN_GENERATION_ID)
-.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
-.setGroupInstanceId(null);
-} else {
-throw new UnsupportedVersionException("Broker unexpectedly 
" +
+throw new UnsupportedVersionException("Broker unexpectedly " +

Review comment:
   I think we can improve this message a little:
   1. Remove `unexpectedly` since it doesn't add much value and it's a bit 
confusing (it makes it sound like there's a broker bug)
   2. Include the minimum version required (`3`) in the message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-11 Thread GitBox


mjsax commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r630610600



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   I guess the logic scattered between Python and Java code... And maybe we 
need 3 only for EOS tests? Not sure either.
   
   Also ok with me to just merge this PR as-is? Or should we update Java code 
instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630605834



##
File path: 
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
##
@@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() {
 }
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnEmptyCollection() {
+List testData = Arrays.asList();
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get empty collection after serialization and 
deserialization on an empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnNull() {
+List testData = null;
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get null after serialization and deserialization on an 
empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of integer primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of short primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(15, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 15 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of float primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Long());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of long primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 

[GitHub] [kafka] mjsax commented on pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


mjsax commented on pull request #10675:
URL: https://github.com/apache/kafka/pull/10675#issuecomment-839282578


   \cc @abbccdda and @guozhangwang to make sure we don't to anything bad 
removing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-839281487


   Thanks for working on this, @rondagostino , and sorry about the delays in 
reviewing.
   
   > We forward the Create/Remove operations to the controller, but this patch 
actually short-circuits that if we are using KRaft with the ZooKeeper-based 
`AclAuthorizer` via the changes to `RaftSupport.maybeForward()`. The reason for 
short-circuiting it is because the KRaft controller doesn't have the code to 
create or remove ACLs (`handle{Create,Delete}Acls` in `KafkaApis`). We could 
add it, of course, in which case the changes to the `maybeForward()` method 
would be unnecessary. Perhaps it would be simpler to do that instead of 
delaying it to an additional PR -- is that what you were suggesting?
   
   Yes, I think we should just do this in `ControllerApis.scala` and be done 
with it.  It's kind of annoying to do in a follow-on PR since we'd have to add 
a lot of special-case code which we'd later have to undo, which is not usually 
the right way to go.  We might even be able to move this code into 
`RequestHandlerHelper` or something since it would be the same between 
`BrokerApis` and `ControllerApis` (I think?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630603482



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception 
{
 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
 "Thread never stopped."
 );
-} finally {
 streams.close();

Review comment:
   Ok, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630603389



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
 () -> streams.localThreadsMetadata().stream().allMatch(t -> 
t.threadState().equals("DEAD")),
 "Streams never stopped"
 );
-} finally {
 streams.close();

Review comment:
   Thanks @mjsax for review. Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630603137



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
 extends MetadataSupport {
+  if (config.requiresZookeeper) {
+throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = 
throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-if (config.requiresZookeeper) {
-  throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {

Review comment:
   I don't think we need (or want) to special-case the ZK authorizer here.  
There is a Confluent authorizer that doesn't depend on ZK, and also a Cloudera 
one.   We don't want to break them.  Just forward everything




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630602650



##
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: 
RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
 extends MetadataSupport {
+  if (config.requiresZookeeper) {
+throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = 
throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-if (config.requiresZookeeper) {
-  throw new IllegalStateException("Config specifies ZooKeeper but metadata 
support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {
+if (!hasZkAuthorizer) {
+  throw createException
 }
   }
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport 
= this
 
   override def maybeForward(request: RequestChannel.Request,
 handler: RequestChannel.Request => Unit,
 responseCallback: Option[AbstractResponse] => 
Unit): Unit = {
 if (!request.isForwarded) {
-  fwdMgr.forwardRequest(request, responseCallback)
+  request.header.apiKey match {
+case ApiKeys.CREATE_ACLS | ApiKeys.DELETE_ACLS =>
+  if (hasZkAuthorizer) {
+handler(request)

Review comment:
   Yeah, I had the same question.  It seems like the code is already in 
place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630602355



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -553,141 +552,152 @@ public void 
testInitializesAndDestroysMetricsReporters() {
 
 @Test
 public void testCloseIsIdempotent() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.close();
-final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.close();
+final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-streams.close();
-Assert.assertEquals("subsequent close() calls should do nothing",
-closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+streams.close();
+Assert.assertEquals("subsequent close() calls should do nothing",
+closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+}
 }
 
 @Test
 public void shouldAddThreadWhenRunning() throws InterruptedException {
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-final int oldSize = streams.threads.size();
-TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
-assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
-assertThat(streams.threads.size(), equalTo(oldSize + 1));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.start();
+final int oldSize = streams.threads.size();
+TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
+assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
+assertThat(streams.threads.size(), equalTo(oldSize + 1));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenCreated() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenClosed() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.close();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.close();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenError() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.start();
-globalStreamThread.shutdown();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.start();
+globalStreamThread.shutdown();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotReturnDeadThreads() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-streamThreadOne.shutdown();
-final Set threads = streams.localThreadsMetadata();
+final Set threads;
+try (final KafkaStreams 

[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630602092



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -553,141 +552,152 @@ public void 
testInitializesAndDestroysMetricsReporters() {
 
 @Test
 public void testCloseIsIdempotent() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.close();
-final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.close();
+final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-streams.close();
-Assert.assertEquals("subsequent close() calls should do nothing",
-closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+streams.close();
+Assert.assertEquals("subsequent close() calls should do nothing",
+closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+}
 }
 
 @Test
 public void shouldAddThreadWhenRunning() throws InterruptedException {
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-final int oldSize = streams.threads.size();
-TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
-assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
-assertThat(streams.threads.size(), equalTo(oldSize + 1));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.start();
+final int oldSize = streams.threads.size();
+TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
+assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
+assertThat(streams.threads.size(), equalTo(oldSize + 1));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenCreated() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenClosed() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.close();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.close();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenError() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.start();
-globalStreamThread.shutdown();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.start();
+globalStreamThread.shutdown();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotReturnDeadThreads() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-streamThreadOne.shutdown();
-final Set threads = streams.localThreadsMetadata();
+final Set threads;
+try (final KafkaStreams 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r630601750



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   Sounds like we don't override the default after all? Or we have at least 
one test where that slipped through 路‍♀️ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630601702



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception 
{
 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
 "Thread never stopped."
 );
-} finally {
 streams.close();

Review comment:
   As above. (Maybe similar elsewhere; won't comment on it explicitly below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r630601449



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -525,6 +525,11 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
 final int transactionTimeoutMs = 
config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
 final long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
 final boolean autoDowngradeTxnCommit = 
config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT);
+// Only log a warning if being used outside of Streams, which we 
know includes "StreamThread-" in the client id
+if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) 
{

Review comment:
   Done https://github.com/apache/kafka/pull/10675




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630601302



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
 () -> streams.localThreadsMetadata().stream().allMatch(t -> 
t.threadState().equals("DEAD")),
 "Streams never stopped"
 );
-} finally {
 streams.close();

Review comment:
   We can remove this line now. `close()` will be cause automatically using 
try-with-resource clause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10675: KAFKA-12574: remove internal Producer config and auto downgrade logic

2021-05-11 Thread GitBox


ableegoldman opened a new pull request #10675:
URL: https://github.com/apache/kafka/pull/10675


   Minor followup to [#10573](https://github.com/apache/kafka/pull/10573), see 
in particular [this comment 
thread](https://github.com/apache/kafka/pull/10573#discussion_r628777848). 
Removes this internal Producer config which was only ever used to avoid a very 
minor amount of work to downgrade the consumer group metadata in the txn commit 
request.
   
   Also replaces deprecation warning suppression with missing `@Deprecated` 
annotation on the MockProducer's deprecated method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630601127



##
File path: tests/kafkatest/services/security/kafka_acls.py
##
@@ -66,17 +67,46 @@ def add_cluster_acl(self, kafka, principal, 
force_use_zk_connection=False, addit
This is necessary for the case where we are bootstrapping ACLs 
before Kafka is started or before authorizer is enabled
 :param additional_cluster_operations_to_grant may be set to ['Alter', 
'Create'] if the cluster is secured since these are required
to create SCRAM credentials and topics, respectively
+:param security_protocol set it to explicitly determine whether we use 
client or broker credentials, otherwise
+we use the the client security protocol unless inter-broker 
security protocol is PLAINTEXT, in which case we use PLAINTEXT.
+Then we use the broker's credentials if the selected security 
protocol matches the inter-broker security protocol,
+otherwise we use the client's credentials.
 """
 node = kafka.nodes[0]
 
 for operation in ['ClusterAction'] + 
additional_cluster_operations_to_grant:
 cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s 
--allow-principal=%(principal)s" % {
-'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, 
force_use_zk_connection),
+'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, 
force_use_zk_connection, security_protocol),
 'operation': operation,
 'principal': principal
 }
 kafka.run_cli_tool(node, cmd)
 
+def remove_cluster_acl(self, kafka, principal, 
force_use_zk_connection=False, additional_cluster_operations_to_remove = [], 
security_protocol=None):

Review comment:
   Since this is new code, it would be really good to avoid introducing 
`force_use_zk_connection` here if possible.  I can't see anywhere in this PR 
where it is used, is this really necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630598753



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -863,7 +878,7 @@ def kafka_configs_cmd_with_optional_security_settings(self, 
node, force_use_zk_c
 # configure JAAS to provide the typical client credentials
 jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
 use_inter_broker_mechanism_for_client = False
-optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF)
+optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF) if security_protocol_to_use != "SSL" 
else ""

Review comment:
   I'm confused by the logic here.  If we have security_protocol==SSL then 
we do not define the jaas properties in KAFKA_OPTS?  Seems a bit weird -- why 
define this when we're using PLAINTEXT or when we're using SASL_SSL, but not 
when using SSL?  Can you add a comment about how this works?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-11 Thread GitBox


mjsax commented on a change in pull request #10673:
URL: https://github.com/apache/kafka/pull/10673#discussion_r630595939



##
File path: tests/kafkatest/services/streams.py
##
@@ -466,6 +466,15 @@ def __init__(self, test_context, kafka, processingMode):
 
"org.apache.kafka.streams.tests.BrokerCompatibilityTest",
 processingMode)
 
+def prop_file(self):
+properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  # the old broker (< 2.4) does not support configuration 
replication.factor=-1
+  "replication.factor": 1}

Review comment:
   Should we set it to 3 instead? IIRC, we run all system tests with 3 
brokers? Wondering why the change broke the system tests, as they should have 
overwritten the default to 3 anyway?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-11 Thread GitBox


cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-839261742


   We need to gate the new behavior behind an IBP bump, right?  Otherwise we 
won't have access to new producer IDs while rolling a cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r630593372



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
##
@@ -179,10 +179,18 @@ public void beginTransaction() throws 
ProducerFencedException {
 this.sentOffsets = false;
 }
 
+@SuppressWarnings("deprecation")

Review comment:
   Actually no, we do get a warning if we don't have either annotation. 
I'll change it to `@Deprecated` then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r630587756



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: 
AllocateProducerIdsRequestData,
+  callback: AllocateProducerIdsResponseData => Unit): 
Unit = {
+
+def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit 
= {
+  results match {
+case Left(error) => callback.apply(new 
AllocateProducerIdsResponseData().setErrorCode(error.code))
+case Right(pidBlock) => callback.apply(
+  new AllocateProducerIdsResponseData()
+.setProducerIdStart(pidBlock.producerIdStart())
+.setProducerIdLen(pidBlock.producerIdLen()))
+  }
+}
+eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+  allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: 
Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+// Handle a few short-circuits
+if (!isActive) {
+  callback.apply(Left(Errors.NOT_CONTROLLER))
+  return
+}
+
+val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+if (brokerEpochOpt.isEmpty) {
+  warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+  callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
   It seems like this should be BROKER_ID_NOT_REGISTERED




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9826) Log cleaning repeatedly picks same segment with no effect when first dirty offset is past start of active segment

2021-05-11 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342904#comment-17342904
 ] 

Jun Rao commented on KAFKA-9826:


[~zhangzs] : The issue was fixed in 2.4.2. Could you try that version?

> Log cleaning repeatedly picks same segment with no effect when first dirty 
> offset is past start of active segment
> -
>
> Key: KAFKA-9826
> URL: https://issues.apache.org/jira/browse/KAFKA-9826
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.4.1
>Reporter: Steve Rodrigues
>Assignee: Steve Rodrigues
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> Seen on a system where a given partition had a single segment, and for 
> whatever reason (deleteRecords?), the logStartOffset was greater than the 
> base segment of the log, there were a continuous series of 
> ```
> [2020-03-03 16:56:31,374] WARN Resetting first dirty offset of  FOO-3 to log 
> start offset 55649 since the checkpointed offset 0 is invalid. 
> (kafka.log.LogCleanerManager$)
> ```
> messages (partition name changed, it wasn't really FOO). This was expected to 
> be resolved by KAFKA-6266 but clearly wasn't. 
> Further investigation revealed that  a few segments were continuously 
> cleaning and generating messages in the `log-cleaner.log` of the form:
> ```
> [2020-03-31 13:34:50,924] INFO Cleaner 1: Beginning cleaning of log FOO-3 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,924] INFO Cleaner 1: Building offset map for FOO-3... 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Building offset map for log FOO-3 
> for 0 segments in offset range [55287, 54237). (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Offset map for log FOO-3 complete. 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Cleaning log FOO-3 (cleaning prior 
> to Wed Dec 31 19:00:00 EST 1969, discarding tombstones prior to Tue Dec 10 
> 13:39:08 EST 2019)... (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO [kafka-log-cleaner-thread-1]: Log cleaner 
> thread 1 cleaned log FOO-3 (dirty section = [55287, 55287])
> 0.0 MB of log processed in 0.0 seconds (0.0 MB/sec).
> Indexed 0.0 MB in 0.0 seconds (0.0 Mb/sec, 100.0% of total time)
> Buffer utilization: 0.0%
> Cleaned 0.0 MB in 0.0 seconds (NaN Mb/sec, 0.0% of total time)
> Start size: 0.0 MB (0 messages)
> End size: 0.0 MB (0 messages) NaN% size reduction (NaN% fewer messages) 
> (kafka.log.LogCleaner)
> ```
> What seems to have happened here (data determined for a different partition) 
> is:
> There exist a number of partitions here which get relatively low traffic, 
> including our friend FOO-5. For whatever reason, LogStartOffset of this 
> partition has moved beyond the baseOffset of the active segment. (Notes in 
> other issues indicate that this is a reasonable scenario.) So there’s one 
> segment, starting at 166266, and a log start of 166301.
> grabFilthiestCompactedLog runs and reads the checkpoint file. We see that 
> this topicpartition needs to be cleaned, and call cleanableOffsets on it 
> which returns an OffsetsToClean with firstDirtyOffset == logStartOffset == 
> 166301 and firstUncleanableOffset = max(logStart, activeSegment.baseOffset) = 
> 116301, and forceCheckpoint = true.
> The checkpoint file is updated in grabFilthiestCompactedLog (this is the fix 
> for KAFKA-6266). We then create a LogToClean object based on the 
> firstDirtyOffset and firstUncleanableOffset of 166301 (past the active 
> segment’s base offset).
> The LogToClean object has cleanBytes = logSegments(-1, 
> firstDirtyOffset).map(_.size).sum → the size of this segment. It has 
> firstUncleanableOffset and cleanableBytes determined by 
> calculateCleanableBytes. calculateCleanableBytes returns:
> {{}}
> {{val firstUncleanableSegment = 
> log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)}}
> {{val firstUncleanableOffset = firstUncleanableSegment.baseOffset}}
> {{val cleanableBytes = log.logSegments(firstDirtyOffset, 
> math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
> (firstUncleanableOffset, cleanableBytes)}}
> firstUncleanableSegment is activeSegment. firstUncleanableOffset is the base 
> offset, 166266. cleanableBytes is looking for logSegments(166301, max(166301, 
> 166266) → which _is the active segment_
> So there are “cleanableBytes” > 0.
> We then filter out segments with totalbytes (clean + cleanable) > 0. This 
> segment has totalBytes > 0, and it has cleanablebytes, so great! It’s 
> filthiest.
> The cleaner picks it, calls cleanLog on it, which then does cleaner.clean, 
> which returns nextDirtyOffset and cleaner stats. cleaner.clean callls 
> 

[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

2021-05-11 Thread GitBox


cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r630569991



##
File path: 
clients/src/main/resources/common/message/AllocateProducerIdsResponse.json
##
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 67,
+  "type": "response",
+  "name": "AllocateProducerIdsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top level response error code" },
+{ "name": "ProducerIdStart", "type": "int64", "versions": "0+",

Review comment:
   I think this should have entity type `producerId`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-05-11 Thread GitBox


ijuma commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r630556204



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -525,6 +525,11 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
 final int transactionTimeoutMs = 
config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
 final long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
 final boolean autoDowngradeTxnCommit = 
config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT);
+// Only log a warning if being used outside of Streams, which we 
know includes "StreamThread-" in the client id
+if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) 
{

Review comment:
   @ableegoldman Your suggestion to remove the config altogether seems best 
to me. We don't have a grace period for internal configs, that's why they're 
internal. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r630553249



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
##
@@ -179,10 +179,18 @@ public void beginTransaction() throws 
ProducerFencedException {
 this.sentOffsets = false;
 }
 
+@SuppressWarnings("deprecation")

Review comment:
   I guess it doesn't need to have either, I can remove in a followup PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-05-11 Thread GitBox


ableegoldman commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r630552512



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -525,6 +525,11 @@ private TransactionManager 
configureTransactionState(ProducerConfig config,
 final int transactionTimeoutMs = 
config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
 final long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
 final boolean autoDowngradeTxnCommit = 
config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT);
+// Only log a warning if being used outside of Streams, which we 
know includes "StreamThread-" in the client id
+if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) 
{

Review comment:
   @ijuma I came across this after the question was raised around the 
autodowngrade logic, apparently (according to the config's javadocs) it's an 
"internal" config that's only used for Streams. The config itself is 
package-private.
   
   Given that, I thought we may want to log a warning to any plain client users 
that saw this config and didn't notice that it was internal, and thus tried to 
use it. But I'm happy to do a followup PR to remove this. Alternatively, we can 
just take this config out -- I actually don't see any reason why it should be 
necessary, AFAICT it's just a slight convenience config that saves Streams from 
the ~5 lines of code it would take to do this downgrade itself (basically it 
just erases the extra consumer group metadata that isn't understood by older 
brokers). Not sure if this was vestigial from an older iteration of KIP-447, as 
it seems rather unnecessary..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630550102



##
File path: 
clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
##
@@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() {
 }
 }
 
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnEmptyCollection() {
+List testData = Arrays.asList();
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get empty collection after serialization and 
deserialization on an empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldReturnNull() {
+List testData = null;
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get null after serialization and deserialization on an 
empty list");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of integer primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() {
+List testData = Arrays.asList(1, 2, 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Integer());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of short primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() {
+List testData = Arrays.asList((short) 1, (short) 2, (short) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Short());
+assertEquals(15, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 15 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of float primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() {
+List testData = Arrays.asList((float) 1, (float) 2, (float) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Float());
+assertEquals(21, listSerde.serializer().serialize(topic, 
testData).length,
+"Should get length of 21 bytes after serialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void listSerdeShouldRoundtripLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 
Serdes.Long());
+assertEquals(testData,
+listSerde.deserializer().deserialize(topic, 
listSerde.serializer().serialize(topic, testData)),
+"Should get the original collection of long primitives after 
serialization and deserialization");
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void 
listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() {
+List testData = Arrays.asList((long) 1, (long) 2, (long) 3);
+Serde> listSerde = Serdes.ListSerde(ArrayList.class, 

[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630542548



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
##
@@ -125,6 +126,27 @@ public UUIDSerde() {
 }
 }
 
+static public final class ListSerde extends 
WrapperSerde> {

Review comment:
   The problem is [all of 
them](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java)
 list `static public` first.
   
   
![image](https://user-images.githubusercontent.com/8620461/117885230-98d8d200-b27b-11eb-849e-afdc43631c89.png)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630542548



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
##
@@ -125,6 +126,27 @@ public UUIDSerde() {
 }
 }
 
+static public final class ListSerde extends 
WrapperSerde> {

Review comment:
   The problem is all of them list `static public` first.
   
   
![image](https://user-images.githubusercontent.com/8620461/117885230-98d8d200-b27b-11eb-849e-afdc43631c89.png)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630539763



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+
+public class ListSerializer implements Serializer> {
+
+private static final List>> 
FIXED_LENGTH_SERIALIZERS = Arrays.asList(
+ShortSerializer.class,
+IntegerSerializer.class,
+FloatSerializer.class,
+LongSerializer.class,
+DoubleSerializer.class,
+UUIDSerializer.class);
+
+private Serializer inner;
+private SerializationStrategy serStrategy;
+private boolean isFixedLength;
+
+public ListSerializer() {}
+
+public ListSerializer(Serializer inner) {
+if (inner == null) {
+throw new IllegalArgumentException("ListSerializer requires 
\"serializer\" parameter to be provided during initialization");
+}
+this.inner = inner;
+this.isFixedLength = 
FIXED_LENGTH_SERIALIZERS.contains(inner.getClass());
+this.serStrategy = this.isFixedLength ? 
SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
+}
+
+public Serializer getInnerSerializer() {
+return inner;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void configure(Map configs, boolean isKey) {
+if (inner != null) {
+throw new ConfigException("List serializer was already initialized 
using a non-default constructor");
+}
+final String innerSerdePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+final Object innerSerdeClassOrName = 
configs.get(innerSerdePropertyName);
+if (innerSerdeClassOrName == null) {
+throw new ConfigException("Not able to determine the serializer 
class because it was neither passed via the constructor nor set in the 
config.");
+}
+try {
+if (innerSerdeClassOrName instanceof String) {
+inner = Utils.newInstance((String) innerSerdeClassOrName, 
Serde.class).serializer();
+} else if (innerSerdeClassOrName instanceof Class) {
+inner = (Serializer) ((Serde) Utils.newInstance((Class) 
innerSerdeClassOrName)).serializer();
+} else {
+throw new KafkaException("Could not create a serializer class 
instance using \"" + innerSerdePropertyName + "\" property.");
+}
+inner.configure(configs, isKey);
+isFixedLength = 
FIXED_LENGTH_SERIALIZERS.contains(inner.getClass());
+serStrategy = this.isFixedLength ? 
SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
+} catch (final ClassNotFoundException e) {
+throw new ConfigException(innerSerdePropertyName, 
innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could 
not be found.");
+}
+}
+
+private void serializeNullIndexList(final DataOutputStream out, 
List data) throws IOException {
+List nullIndexList = IntStream.range(0, data.size())
+.filter(i -> data.get(i) == null)

Review comment:
   Would something like this work?
   ```
   int i = 0;
   List nullIndexList = new ArrayList<>();
   for (Iterator it = data.listIterator(); it.hasNext(); i++) {
   if (it.next() == null) {
   nullIndexList.add(i); 
   }
   }
   out.writeInt(nullIndexList.size());
   for (int 

[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630539285



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+
+public class ListSerializer implements Serializer> {
+
+private static final List>> 
FIXED_LENGTH_SERIALIZERS = Arrays.asList(
+ShortSerializer.class,
+IntegerSerializer.class,
+FloatSerializer.class,
+LongSerializer.class,
+DoubleSerializer.class,
+UUIDSerializer.class);
+
+private Serializer inner;
+private SerializationStrategy serStrategy;
+private boolean isFixedLength;
+
+public ListSerializer() {}
+
+public ListSerializer(Serializer inner) {
+if (inner == null) {
+throw new IllegalArgumentException("ListSerializer requires 
\"serializer\" parameter to be provided during initialization");
+}
+this.inner = inner;
+this.isFixedLength = 
FIXED_LENGTH_SERIALIZERS.contains(inner.getClass());
+this.serStrategy = this.isFixedLength ? 
SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
+}
+
+public Serializer getInnerSerializer() {
+return inner;
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void configure(Map configs, boolean isKey) {
+if (inner != null) {
+throw new ConfigException("List serializer was already initialized 
using a non-default constructor");
+}
+final String innerSerdePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+final Object innerSerdeClassOrName = 
configs.get(innerSerdePropertyName);
+if (innerSerdeClassOrName == null) {
+throw new ConfigException("Not able to determine the serializer 
class because it was neither passed via the constructor nor set in the 
config.");
+}
+try {
+if (innerSerdeClassOrName instanceof String) {
+inner = Utils.newInstance((String) innerSerdeClassOrName, 
Serde.class).serializer();
+} else if (innerSerdeClassOrName instanceof Class) {
+inner = (Serializer) ((Serde) Utils.newInstance((Class) 
innerSerdeClassOrName)).serializer();
+} else {
+throw new KafkaException("Could not create a serializer class 
instance using \"" + innerSerdePropertyName + "\" property.");
+}
+inner.configure(configs, isKey);
+isFixedLength = 
FIXED_LENGTH_SERIALIZERS.contains(inner.getClass());
+serStrategy = this.isFixedLength ? 
SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE;
+} catch (final ClassNotFoundException e) {
+throw new ConfigException(innerSerdePropertyName, 
innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could 
not be found.");
+}
+}
+
+private void serializeNullIndexList(final DataOutputStream out, 
List data) throws IOException {
+List nullIndexList = IntStream.range(0, data.size())
+.filter(i -> data.get(i) == null)

Review comment:
   Would something like this work?
   ```
   int i = 0;
   List nullIndexList = new ArrayList<>();
   for (Iterator it = data.listIterator(); it.hasNext(); i++) {
   if (it.next() == null) {
   nullIndexList.add(i); 
   }
   }
   out.writeInt(nullIndexList.size());
   for (int 

[GitHub] [kafka] yeralin commented on a change in pull request #6592: KAFKA-8326: Introduce List Serde

2021-05-11 Thread GitBox


yeralin commented on a change in pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#discussion_r630523305



##
File path: 
clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class ListDeserializer implements Deserializer> {
+
+private static final Map>, Integer> 
FIXED_LENGTH_DESERIALIZERS = mkMap(
+mkEntry(ShortDeserializer.class, Short.BYTES),
+mkEntry(IntegerDeserializer.class, Integer.BYTES),
+mkEntry(FloatDeserializer.class, Float.BYTES),
+mkEntry(LongDeserializer.class, Long.BYTES),
+mkEntry(DoubleDeserializer.class, Double.BYTES),
+mkEntry(UUIDDeserializer.class, 36)
+);
+
+private Deserializer inner;
+private Class listClass;
+private Integer primitiveSize;
+
+public ListDeserializer() {}
+
+public > ListDeserializer(Class listClass, 
Deserializer inner) {
+if (listClass == null || inner == null) {
+throw new IllegalArgumentException("ListDeserializer requires both 
\"listClass\" and \"innerDeserializer\" parameters to be provided during 
initialization");
+}
+this.listClass = listClass;
+this.inner = inner;
+this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass());
+}
+
+public Deserializer getInnerDeserializer() {
+return inner;
+}
+
+@Override
+public void configure(Map configs, boolean isKey) {
+if (listClass != null || inner != null) {
+throw new ConfigException("List deserializer was already 
initialized using a non-default constructor");
+}
+configureListClass(configs, isKey);
+configureInnerSerde(configs, isKey);
+}
+
+private void configureListClass(Map configs, boolean isKey) {
+String listTypePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS;
+final Object listClassOrName = configs.get(listTypePropertyName);
+if (listClassOrName == null) {
+throw new ConfigException("Not able to determine the list class 
because it was neither passed via the constructor nor set in the config.");
+}
+try {
+if (listClassOrName instanceof String) {
+listClass = Utils.loadClass((String) listClassOrName, 
Object.class);
+} else if (listClassOrName instanceof Class) {
+listClass = (Class) listClassOrName;
+} else {
+throw new KafkaException("Could not determine the list class 
instance using \"" + listTypePropertyName + "\" property.");
+}
+} catch (final ClassNotFoundException e) {
+throw new ConfigException(listTypePropertyName, listClassOrName, 
"Deserializer's list class \"" + listClassOrName + "\" could not be found.");
+}
+}
+
+@SuppressWarnings("unchecked")
+private void configureInnerSerde(Map configs, boolean isKey) {
+String innerSerdePropertyName = isKey ? 
CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : 
CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS;
+final Object innerSerdeClassOrName = 
configs.get(innerSerdePropertyName);
+if (innerSerdeClassOrName == null) {
+throw new 

[jira] [Assigned] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order

2021-05-11 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-12682:
--

Assignee: Justine Olshan

> Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order 
> -
>
> Key: KAFKA-12682
> URL: https://issues.apache.org/jira/browse/KAFKA-12682
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
>Reporter: jacky
>Assignee: Justine Olshan
>Priority: Major
>  Labels: kip-500
>
> In version 2.8, MetadataPartitionsBuilder has the field _localChanged and 
> _localRemoved which record the change and delete partition, but we always 
> process _localChanged partitions, and then _localRemoved in the 
> kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the 
> original order, for example, 
> 1. migrate the partition p1 from b0 to b1;
> 2. change the leader of p1 
> 3.migrate p1 from b1 to b0
> and the _localRemoved will delete the p1 at last.
> and I think MetadataPartition should include topic uuid, and the topic name 
> is optional
> for example,
> create topic t1, delete topic t1, create topic t1, change leader of p1
> and then compact the records 
> delete topic t1, change t1, p1
> but currently, implementation will be
> 1. process change t1, p1
> 2. process delete topic t1
> but the MetadataPartition doesn't include topic uuid, it only includes topic 
> name, when to process, it can't find the origin topic uuid, and find the 
> latest the topic id, but it's not right. and delete topic t1 should do before 
> create t1 or change p1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12766) Consider Disabling WAL-related Options in RocksDB

2021-05-11 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12766:
---
Labels: newbie newbie++  (was: )

> Consider Disabling WAL-related Options in RocksDB
> -
>
> Key: KAFKA-12766
> URL: https://issues.apache.org/jira/browse/KAFKA-12766
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.0.0
>
>
> Streams disables the write-ahead log (WAL) provided by RocksDB since it 
> replicates the data in changelog topics. Hence, it does not make much sense 
> to set WAL-related configs for RocksDB instances within Streams.
> Streams could:
> - disable WAL-related options
> - ignore WAL-related options
> - throw an exception when a WAL-related option is set.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Boojapho commented on pull request #10642: KAFKA-12756: Update Zookeeper to 3.6.3 or higher

2021-05-11 Thread GitBox


Boojapho commented on pull request #10642:
URL: https://github.com/apache/kafka/pull/10642#issuecomment-838928359


   > Also, we override the netty version, so I am not sure the ZK version needs 
to be bumped at all.
   
   So long as only one verson of netty remains in the release, that should work 
instead of bumping ZK.  I think 2.8 already uses a version that fixes the 
vulnerability.  2.7 and 2.6 might still need updates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-05-11 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342778#comment-17342778
 ] 

Ming Liu commented on KAFKA-12713:
--

Here is PR [https://github.com/apache/kafka/pull/10674/files] for the KIP. 
Please comment.

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mingaliu opened a new pull request #10674: KAFKA-12713: Report the real fetch latency by removing the wait-time in purgatory.

2021-05-11 Thread GitBox


mingaliu opened a new pull request #10674:
URL: https://github.com/apache/kafka/pull/10674


   This is to help monitor the 'real' fetch latency by removing the waitTime 
when FetchRequest is in purgatory.
   The changes include:
   1. Add waitTimeMs in FetchResponse()
   2. In Kafka API handler (in handleFetchRequest() function),  when creating 
FetchResponse(),  set the waitTimeMs as the time spent in purgatory
   3. In Follower broker processFetchRequest(),  it tracks the real latency of 
fetch requests by minus the waitTimeMs from FetchResponse.
   4. In FetcherStats, we will add a new histogram to track this calculated 
"true" fetch latency.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10626: KAFKA-12744: Breaking change dependency upgrade: "argparse4j" 0.7.0 -->> 0.9.0

2021-05-11 Thread GitBox


dejan2609 commented on pull request #10626:
URL: https://github.com/apache/kafka/pull/10626#issuecomment-838902447


   Changing status to draft (until test CLI, as mentioned in a comments above),


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 edited a comment on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-11 Thread GitBox


dejan2609 edited a comment on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-838776681


   > Looks like there is a change in behavior in Gradle 7 related to resource 
files that's causing a bunch of tests to fail
   
   Indeed... on my local machine I received around 1% of broken tests (and if I 
recall correctly they were all related to SSL/TLS, so I was hoping for a better 
test results here). 
   
   Back to drawing board, I guess (I will execute tests again on my side with 
or without gradle shadow upgrade and then compare with test results here). 
   
   Changing PR status to **_draft_** (again).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630370013



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.

Review comment:
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630369878



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+snapshots --= expiredSnapshots.keys
+
+expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is 
less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close 
and delete the given
+   * snapshots.

Review comment:
   Added your suggestion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.


[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630369703



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +248,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until
+  private def forgetSnapshotsBefore(
+logStartSnapshotId: OffsetAndEpoch
+  ): mutable.TreeMap[OffsetAndEpoch, Option[FileRawSnapshotReader]] = {
+val expiredSnapshots = snapshots.until(logStartSnapshotId).clone()
+snapshots --= expiredSnapshots.keys
+
+expiredSnapshots
   }
 
   /**
-   * Removes all snapshots on the log directory whose epoch and end offset is 
less than the giving epoch and end offset.
+   * Rename the given snapshots on the log directory. Asynchronously, close 
and delete the given
+   * snapshots.
*/
-  private def removeSnapshotFilesBefore(logStartSnapshotId: OffsetAndEpoch): 
Unit = {
-val expiredSnapshotIdsIter = snapshotIds.headSet(logStartSnapshotId, 
false).iterator
-while 

[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630367487



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -161,19 +162,24 @@ final class KafkaMetadataLog private (
 
   override def truncateToLatestSnapshot(): Boolean = {
 val latestEpoch = log.latestEpoch.getOrElse(0)
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotId.epoch > latestEpoch ||
-(snapshotId.epoch == latestEpoch && snapshotId.offset > 
endOffset().offset)) =>
+val (truncated, forgottenSnapshots) = latestSnapshotId().asScala match {

Review comment:
   Synchronizing `snapshots` is only needed when accessing that object. In 
`deleteBeforeSnapshot` it is grabbed because the `match` expression accesses 
`snapshots` in one of the `case`/branch.
   
   In this method I think it is safe to only grab the log where we currently do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2021-05-11 Thread Moses Nakamura (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342739#comment-17342739
 ] 

Moses Nakamura commented on KAFKA-3539:
---

We've been wrestling with this problem for a while.  I think my company would 
like to start the process to try to patch it.  However, it's definitely thorny, 
so we had some issues we wanted to talk about!  The main questions seem to be:
 # Which thread should wait for the partition metadata?
 # How can we preserve the ordering semantics that send gives us today?

I think the ideal answer to (1) would be, "no thread" and that we should do it 
asynchronously.  This would be a pretty invasive change though, so I think it's 
probably more expedient to pass in a threadpool and do it there.  How would you 
feel about optionally passing in a threadpool to KafkaProducer#send, or else on 
construction to KafkaProducer?

For (2), it seems like we have two plausible approaches, either pass async data 
to the RecordAccumulator, or else layer another queue on top.  I think layering 
another queue on top would be less invasive, so I would lean toward that 
approach.  We could have a central queue that orders all of the work, 
conditioned on the metadata that we're waiting for being ready.

Does that sound good to you?  Should I put up a patch so we can talk about it?

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>  Labels: needs-discussion, needs-kip
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r630364576



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java
##
@@ -54,8 +54,12 @@ public Records records() {
 }
 
 @Override
-public void close() throws IOException {
-fileRecords.close();
+public void close() {

Review comment:
   I created https://issues.apache.org/jira/browse/KAFKA-12773




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12773) Use UncheckedIOException when wrapping IOException

2021-05-11 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12773:
--

 Summary: Use UncheckedIOException when wrapping IOException
 Key: KAFKA-12773
 URL: https://issues.apache.org/jira/browse/KAFKA-12773
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio


Use UncheckedIOException when wrapping IOException instead of RuntimeException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12758) Create a new `server-common` module and move ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to that module.

2021-05-11 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-12758.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

merged the PR to trunk

> Create a new `server-common` module and move ApiMessageAndVersion, 
> RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to that module.
> -
>
> Key: KAFKA-12758
> URL: https://issues.apache.org/jira/browse/KAFKA-12758
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.0.0
>
>
> Create a new `server-common` module and move ApiMessageAndVersion, 
> RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to that module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao merged pull request #10638: KAFKA-12758 Added `server-common` module to have server side common classes.

2021-05-11 Thread GitBox


junrao merged pull request #10638:
URL: https://github.com/apache/kafka/pull/10638


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628550053



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -104,18 +105,29 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 }
 
 /**
- * Delete the snapshot from the filesystem, the caller may firstly rename 
snapshot file to
- * ${file}.deleted, so we try to delete the file as well as the renamed 
file if exists.
+ * Delete the snapshot from the filesystem.
  */
-public static boolean deleteSnapshotIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
-Path immutablePath = Snapshots.snapshotPath(logDir, snapshotId);
-Path deletingPath = Snapshots.deleteRename(immutablePath, snapshotId);
+public static boolean deleteIfExists(Path logDir, OffsetAndEpoch 
snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
 try {
-return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletingPath);
+return Files.deleteIfExists(immutablePath) | 
Files.deleteIfExists(deletedPath);
 } catch (IOException e) {
-log.error("Error deleting snapshot file " + deletingPath, e);
+log.error("Error deleting snapshot files {} and {}", 
immutablePath, deletedPath, e);
 return false;
 }
 }
 
+/**
+ * Mark a snapshot for deletion by renaming with the deleted suffix
+ */
+public static void markForDelete(Path logDir, OffsetAndEpoch snapshotId) {
+Path immutablePath = snapshotPath(logDir, snapshotId);
+Path deletedPath = deleteRename(immutablePath, snapshotId);
+try {
+Files.move(immutablePath, deletedPath, 
StandardCopyOption.ATOMIC_MOVE);

Review comment:
   I filed an issue regarding `atomicMoveWithFallback` and its used on 
`FileRecords`. `FileRecords` is used by snapshots and log segments. For now, 
I'll revert to `atomicMoveWithFallback` and address this issue in that Jira.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
   The issue is that Kafka needs to compile against both Scala 2.12 and 
2.13. In Scala 2.13 a lot of the collection methods were deprecated. The 
community created 
[scala-collection-compat](https://github.com/scala/scala-collection-compat) to 
allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that 
project. Unfortunately, there is a pretty annoying bug in the latest stable 
version of `scala-collection-compat` that generates "unused import warning" 
when used in 2.13. The Kafka project turns those warnings into errors and the 
Scala compiler doesn't allow the use of `nowarn` in imports.
   
   The best solution I can find is to use 2.12 methods that are deprecated and 
add this nowarn flag.
   
   It looks 

[GitHub] [kafka] jsancio commented on a change in pull request #10431: KAFKA-12543: Change RawSnapshotReader ownership model

2021-05-11 Thread GitBox


jsancio commented on a change in pull request #10431:
URL: https://github.com/apache/kafka/pull/10431#discussion_r628546403



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -242,85 +246,116 @@ final class KafkaMetadataLog private (
   }
 
   override def readSnapshot(snapshotId: OffsetAndEpoch): 
Optional[RawSnapshotReader] = {
-try {
-  if (snapshotIds.contains(snapshotId)) {
-Optional.of(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
-  } else {
-Optional.empty()
+snapshots synchronized {
+  val reader = snapshots.get(snapshotId) match {
+case None =>
+  // Snapshot doesn't exists
+  None
+case Some(None) =>
+  // Snapshot exists but has never been read before
+  try {
+val snapshotReader = 
Some(FileRawSnapshotReader.open(log.dir.toPath, snapshotId))
+snapshots.put(snapshotId, snapshotReader)
+snapshotReader
+  } catch {
+case _: NoSuchFileException =>
+  // Snapshot doesn't exists in the data dir; remove
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  warn(s"Couldn't read $snapshotId; expected to find snapshot file 
$path")
+  snapshots.remove(snapshotId)
+  None
+  }
+case Some(value) =>
+  // Snapshot exists and it is already open; do nothing
+  value
   }
-} catch {
-  case _: NoSuchFileException =>
-Optional.empty()
+
+  reader.asJava.asInstanceOf[Optional[RawSnapshotReader]]
 }
   }
 
   override def latestSnapshotId(): Optional[OffsetAndEpoch] = {
-val descending = snapshotIds.descendingIterator
-if (descending.hasNext) {
-  Optional.of(descending.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.lastOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def earliestSnapshotId(): Optional[OffsetAndEpoch] = {
-val ascendingIterator = snapshotIds.iterator
-if (ascendingIterator.hasNext) {
-  Optional.of(ascendingIterator.next)
-} else {
-  Optional.empty()
+snapshots synchronized {
+  snapshots.headOption.map { case (snapshotId, _) => snapshotId }.asJava
 }
   }
 
   override def onSnapshotFrozen(snapshotId: OffsetAndEpoch): Unit = {
-snapshotIds.add(snapshotId)
+snapshots synchronized {
+  snapshots.put(snapshotId, None)
+}
   }
 
   override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): 
Boolean = {
-latestSnapshotId().asScala match {
-  case Some(snapshotId) if (snapshotIds.contains(logStartSnapshotId) &&
-startOffset < logStartSnapshotId.offset &&
-logStartSnapshotId.offset <= snapshotId.offset &&
-log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
-log.deleteOldSegments()
+val (deleted, forgottenSnapshots) = snapshots synchronized {
+  latestSnapshotId().asScala match {
+case Some(snapshotId) if (snapshots.contains(logStartSnapshotId) &&
+  startOffset < logStartSnapshotId.offset &&
+  logStartSnapshotId.offset <= snapshotId.offset &&
+  log.maybeIncrementLogStartOffset(logStartSnapshotId.offset, 
SnapshotGenerated)) =>
+
+  // Delete all segments that have a "last offset" less than the log 
start offset
+  log.deleteOldSegments()
 
-// Delete snapshot after increasing LogStartOffset
-removeSnapshotFilesBefore(logStartSnapshotId)
+  // Forget snapshots less than the log start offset
+  (true, forgetSnapshotsBefore(logStartSnapshotId))
+case _ =>
+  (false, mutable.TreeMap.empty[OffsetAndEpoch, 
Option[FileRawSnapshotReader]])
+  }
+}
 
-true
+removeSnapshots(forgottenSnapshots)
+deleted
+  }
 
-  case _ => false
-}
+  /**
+   * Forget the snapshots earlier than a given snapshot id and return the 
associated
+   * snapshot readers.
+   *
+   * This method assumes that the lock for `snapshots` is ready held.
+   */
+  @nowarn("cat=deprecation") // Needed for TreeMap.until

Review comment:
   The issue is that Kafka needs to compile against both Scala 2.12 and 
2.13. In Scala 2.13 a lot of the collection methods were deprecated. The 
community created 
[scala-collection-compat](https://github.com/scala/scala-collection-compat) to 
allow the use of 2.13 functionality in 2.12. Apache Kafka depends on that 
project. Unfortunately, there is a pretty annoying bug in the latest stable 
version of `scala-collection-compat` that generates "unused import warning" 
when used in 2.13. The Kafka project turns those warnings into error and the 
Scala compiler doesn't allow the use of `nowarn` in imports.
   
   The best solution I can find is to use 2.12 methods that are deprecated and 
add this nowarn flag.
   
   It looks like 

[GitHub] [kafka] dejan2609 commented on pull request #10606: KAFKA-12728: version upgrades: gradle (6.8.3 -->> 7.0.1) and gradle shadow plugin (6.1.0 -->> 7.0.0)

2021-05-11 Thread GitBox


dejan2609 commented on pull request #10606:
URL: https://github.com/apache/kafka/pull/10606#issuecomment-838776681


   > Looks like there is a change in behavior in Gradle 7 related to resource 
files that's causing a bunch of tests to fail
   
   Indeed... on my local machine I received around 1% of broken tests (and if I 
recall correctly they were all related to SSL/TLS, so I was hoping for a better 
test results here). 
   
   Back to drawing board, I guess (I will execute tests again on my side with 
or without gradle shadow upgrade and/or compare with test results here). 
   
   Changing PR status to **_draft_** (again).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10647: MINOR Removed copying storage libraries specifically as the task already copies them.

2021-05-11 Thread GitBox


ijuma commented on a change in pull request #10647:
URL: https://github.com/apache/kafka/pull/10647#discussion_r630330236



##
File path: build.gradle
##
@@ -1003,10 +1003,6 @@ project(':core') {
 from(project(':connect:mirror').configurations.runtimeClasspath) { 
into("libs/") }
 from(project(':connect:mirror-client').jar) { into("libs/") }
 from(project(':connect:mirror-client').configurations.runtimeClasspath) { 
into("libs/") }
-from(project(':storage').jar) { into("libs/") }
-from(project(':storage').configurations.runtimeClasspath) { into("libs/") }
-from(project(':storage:api').jar) { into("libs/") }
-from(project(':storage:api').configurations.runtimeClasspath) { 
into("libs/") }

Review comment:
   Makes sense, we only need to add modules that are not a core dependency 
already.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #10647: MINOR Removed copying storage libraries specifically as the task already copies them.

2021-05-11 Thread GitBox


junrao merged pull request #10647:
URL: https://github.com/apache/kafka/pull/10647


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10638: KAFKA-12758 Added `server-common` module to have server side common classes.

2021-05-11 Thread GitBox


mumrah commented on a change in pull request #10638:
URL: https://github.com/apache/kafka/pull/10638#discussion_r630302538



##
File path: build.gradle
##
@@ -1345,6 +1349,62 @@ project(':raft') {
   }
 }
 
+project(':server-common') {
+  archivesBaseName = "kafka-server-common"
+
+  dependencies {
+api project(':clients')
+implementation libs.slf4jApi
+
+testImplementation project(':clients')
+testImplementation project(':clients').sourceSets.test.output
+testImplementation libs.junitJupiter
+testImplementation libs.mockitoCore
+
+testRuntimeOnly libs.slf4jlog4j
+  }
+
+  task createVersionFile(dependsOn: determineCommitId) {
+ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName")
+outputs.file receiptFile
+outputs.upToDateWhen { false }
+doLast {
+  def data = [
+  commitId: commitId,
+  version: version,
+  ]
+
+  receiptFile.parentFile.mkdirs()
+  def content = data.entrySet().collect { "$it.key=$it.value" 
}.sort().join("\n")
+  receiptFile.setText(content, "ISO-8859-1")
+}
+  }
+
+  sourceSets {
+main {
+  java {

Review comment:
   Are we only going to allow Java sources in this new module? (I think 
that's probably a good idea)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10638: KAFKA-12758 Added `server-common` module to have server side common classes.

2021-05-11 Thread GitBox


mumrah commented on a change in pull request #10638:
URL: https://github.com/apache/kafka/pull/10638#discussion_r630302538



##
File path: build.gradle
##
@@ -1345,6 +1349,62 @@ project(':raft') {
   }
 }
 
+project(':server-common') {
+  archivesBaseName = "kafka-server-common"
+
+  dependencies {
+api project(':clients')
+implementation libs.slf4jApi
+
+testImplementation project(':clients')
+testImplementation project(':clients').sourceSets.test.output
+testImplementation libs.junitJupiter
+testImplementation libs.mockitoCore
+
+testRuntimeOnly libs.slf4jlog4j
+  }
+
+  task createVersionFile(dependsOn: determineCommitId) {
+ext.receiptFile = file("$buildDir/kafka/$buildVersionFileName")
+outputs.file receiptFile
+outputs.upToDateWhen { false }
+doLast {
+  def data = [
+  commitId: commitId,
+  version: version,
+  ]
+
+  receiptFile.parentFile.mkdirs()
+  def content = data.entrySet().collect { "$it.key=$it.value" 
}.sort().join("\n")
+  receiptFile.setText(content, "ISO-8859-1")
+}
+  }
+
+  sourceSets {
+main {
+  java {

Review comment:
   Are we only going to allow Java sources in this new module? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


jlprat commented on pull request #10669:
URL: https://github.com/apache/kafka/pull/10669#issuecomment-838708152


   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


jlprat commented on pull request #10672:
URL: https://github.com/apache/kafka/pull/10672#issuecomment-838708838


   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat closed pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


jlprat closed pull request #10672:
URL: https://github.com/apache/kafka/pull/10672


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat closed pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


jlprat closed pull request #10669:
URL: https://github.com/apache/kafka/pull/10669


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8562) SASL_SSL still performs reverse DNS lookup despite KAFKA-5051

2021-05-11 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-8562:
-
Fix Version/s: 2.8.1
   2.7.2

> SASL_SSL still performs reverse DNS lookup despite KAFKA-5051
> -
>
> Key: KAFKA-8562
> URL: https://issues.apache.org/jira/browse/KAFKA-8562
> Project: Kafka
>  Issue Type: Bug
>Reporter: Badai Aqrandista
>Assignee: Davor Poldrugo
>Priority: Minor
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> When using SASL_SSL, the Kafka client performs a reverse DNS lookup to 
> resolve IP to DNS. So, this circumvent the security fix made in KAFKA-5051. 
> This is the line of code from AK 2.2 where it performs the lookup:
> https://github.com/apache/kafka/blob/2.2.0/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java#L205
> Following log messages show that consumer initially tried to connect with IP 
> address 10.0.2.15. Then suddenly it created SaslClient with a hostname:
> {code:java}
> [2019-06-18 06:23:36,486] INFO Kafka commitId: 00d486623990ed9d 
> (org.apache.kafka.common.utils.AppInfoParser)
> [2019-06-18 06:23:36,487] DEBUG [Consumer 
> clientId=KafkaStore-reader-_schemas, groupId=schema-registry-10.0.2.15-18081] 
> Kafka consumer initialized (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2019-06-18 06:23:36,505] DEBUG [Consumer 
> clientId=KafkaStore-reader-_schemas, groupId=schema-registry-10.0.2.15-18081] 
> Initiating connection to node 10.0.2.15:19094 (id: -1 rack: null) using 
> address /10.0.2.15 (org.apache.kafka.clients.NetworkClient)
> [2019-06-18 06:23:36,512] DEBUG Set SASL client state to 
> SEND_APIVERSIONS_REQUEST 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2019-06-18 06:23:36,515] DEBUG Creating SaslClient: 
> client=null;service=kafka;serviceHostname=quickstart.confluent.io;mechs=[PLAIN]
>  (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> {code}
> Thanks
> Badai



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


omkreddy commented on pull request #10672:
URL: https://github.com/apache/kafka/pull/10672#issuecomment-838657103


   @jlprat Thanks for PR. I will directly push the original commit to 2.8 and 
2.7 branches. We can close this PR.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


omkreddy commented on pull request #10669:
URL: https://github.com/apache/kafka/pull/10669#issuecomment-838656757


   @jlprat Thanks for PR. I will directly push the original commit to 2.8 and 
2.7 branches. We can close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


chia7712 commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630238771



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -553,141 +552,152 @@ public void 
testInitializesAndDestroysMetricsReporters() {
 
 @Test
 public void testCloseIsIdempotent() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.close();
-final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.close();
+final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-streams.close();
-Assert.assertEquals("subsequent close() calls should do nothing",
-closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+streams.close();
+Assert.assertEquals("subsequent close() calls should do nothing",
+closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+}
 }
 
 @Test
 public void shouldAddThreadWhenRunning() throws InterruptedException {
 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-final int oldSize = streams.threads.size();
-TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
-assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
-assertThat(streams.threads.size(), equalTo(oldSize + 1));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+streams.start();
+final int oldSize = streams.threads.size();
+TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
+assertThat(streams.addStreamThread(), 
equalTo(Optional.of("processId-StreamThread-" + 2)));
+assertThat(streams.threads.size(), equalTo(oldSize + 1));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenCreated() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenClosed() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.close();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.close();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotAddThreadWhenError() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-final int oldSize = streams.threads.size();
-streams.start();
-globalStreamThread.shutdown();
-assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-assertThat(streams.threads.size(), equalTo(oldSize));
+try (final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+final int oldSize = streams.threads.size();
+streams.start();
+globalStreamThread.shutdown();
+assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+assertThat(streams.threads.size(), equalTo(oldSize));
+}
 }
 
 @Test
 public void shouldNotReturnDeadThreads() {
-final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-streams.start();
-streamThreadOne.shutdown();
-final Set threads = streams.localThreadsMetadata();
+final Set threads;
+try (final KafkaStreams 

[GitHub] [kafka] chia7712 opened a new pull request #10673: MINOR: set replication.factor to 1 to make StreamsBrokerCompatibility…

2021-05-11 Thread GitBox


chia7712 opened a new pull request #10673:
URL: https://github.com/apache/kafka/pull/10673


   related to #10532
   
   the default value of `replication.factor` was changed from `1` to `-1`. The 
old broker (< 2.4) does not support such configuration so this PR sets the 
`replication.factor` to `1` to fix `streams_broker_compatibility_test.py`. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vitojeng commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

2021-05-11 Thread GitBox


vitojeng commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-838538304


   @mjsax , @ableegoldman 
   Please take a look. :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9826) Log cleaning repeatedly picks same segment with no effect when first dirty offset is past start of active segment

2021-05-11 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342571#comment-17342571
 ] 

zhangzhisheng commented on KAFKA-9826:
--

we have same problems,kafka version 2.12_2.4.1
{code:java}
// code placeholder
[2021-05-11 18:31:33,329] WARN Resetting first dirty offset of 
__consumer_offsets-30 to log start offset 4187979634 since the checkpointed 
offset 4187569609 is invalid. (kafka.log.LogCleanerManager$)
{code}
 

> Log cleaning repeatedly picks same segment with no effect when first dirty 
> offset is past start of active segment
> -
>
> Key: KAFKA-9826
> URL: https://issues.apache.org/jira/browse/KAFKA-9826
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.4.1
>Reporter: Steve Rodrigues
>Assignee: Steve Rodrigues
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> Seen on a system where a given partition had a single segment, and for 
> whatever reason (deleteRecords?), the logStartOffset was greater than the 
> base segment of the log, there were a continuous series of 
> ```
> [2020-03-03 16:56:31,374] WARN Resetting first dirty offset of  FOO-3 to log 
> start offset 55649 since the checkpointed offset 0 is invalid. 
> (kafka.log.LogCleanerManager$)
> ```
> messages (partition name changed, it wasn't really FOO). This was expected to 
> be resolved by KAFKA-6266 but clearly wasn't. 
> Further investigation revealed that  a few segments were continuously 
> cleaning and generating messages in the `log-cleaner.log` of the form:
> ```
> [2020-03-31 13:34:50,924] INFO Cleaner 1: Beginning cleaning of log FOO-3 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,924] INFO Cleaner 1: Building offset map for FOO-3... 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Building offset map for log FOO-3 
> for 0 segments in offset range [55287, 54237). (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Offset map for log FOO-3 complete. 
> (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO Cleaner 1: Cleaning log FOO-3 (cleaning prior 
> to Wed Dec 31 19:00:00 EST 1969, discarding tombstones prior to Tue Dec 10 
> 13:39:08 EST 2019)... (kafka.log.LogCleaner)
> [2020-03-31 13:34:50,927] INFO [kafka-log-cleaner-thread-1]: Log cleaner 
> thread 1 cleaned log FOO-3 (dirty section = [55287, 55287])
> 0.0 MB of log processed in 0.0 seconds (0.0 MB/sec).
> Indexed 0.0 MB in 0.0 seconds (0.0 Mb/sec, 100.0% of total time)
> Buffer utilization: 0.0%
> Cleaned 0.0 MB in 0.0 seconds (NaN Mb/sec, 0.0% of total time)
> Start size: 0.0 MB (0 messages)
> End size: 0.0 MB (0 messages) NaN% size reduction (NaN% fewer messages) 
> (kafka.log.LogCleaner)
> ```
> What seems to have happened here (data determined for a different partition) 
> is:
> There exist a number of partitions here which get relatively low traffic, 
> including our friend FOO-5. For whatever reason, LogStartOffset of this 
> partition has moved beyond the baseOffset of the active segment. (Notes in 
> other issues indicate that this is a reasonable scenario.) So there’s one 
> segment, starting at 166266, and a log start of 166301.
> grabFilthiestCompactedLog runs and reads the checkpoint file. We see that 
> this topicpartition needs to be cleaned, and call cleanableOffsets on it 
> which returns an OffsetsToClean with firstDirtyOffset == logStartOffset == 
> 166301 and firstUncleanableOffset = max(logStart, activeSegment.baseOffset) = 
> 116301, and forceCheckpoint = true.
> The checkpoint file is updated in grabFilthiestCompactedLog (this is the fix 
> for KAFKA-6266). We then create a LogToClean object based on the 
> firstDirtyOffset and firstUncleanableOffset of 166301 (past the active 
> segment’s base offset).
> The LogToClean object has cleanBytes = logSegments(-1, 
> firstDirtyOffset).map(_.size).sum → the size of this segment. It has 
> firstUncleanableOffset and cleanableBytes determined by 
> calculateCleanableBytes. calculateCleanableBytes returns:
> {{}}
> {{val firstUncleanableSegment = 
> log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)}}
> {{val firstUncleanableOffset = firstUncleanableSegment.baseOffset}}
> {{val cleanableBytes = log.logSegments(firstDirtyOffset, 
> math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size.toLong).sum
> (firstUncleanableOffset, cleanableBytes)}}
> firstUncleanableSegment is activeSegment. firstUncleanableOffset is the base 
> offset, 166266. cleanableBytes is looking for logSegments(166301, max(166301, 
> 166266) → which _is the active segment_
> So there are “cleanableBytes” > 0.
> We then filter out segments with totalbytes (clean + cleanable) > 0. This 
> 

[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-838339697


   I saw there are cooperative sticky tests failed. I'll update it, and add 
more tests into it tomorrow or later. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10667: KAFKA-12772: Move all transaction state transition rules into their states

2021-05-11 Thread GitBox


dengziming commented on pull request #10667:
URL: https://github.com/apache/kafka/pull/10667#issuecomment-838294186


   Hello @guozhangwang ,PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10672: KAFKA-12769: Backport to 2.7 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


jlprat commented on pull request #10672:
URL: https://github.com/apache/kafka/pull/10672#issuecomment-838221652


   Test failure was:
   `Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()`
   This flaky test is already fixed 
(https://issues.apache.org/jira/browse/KAFKA-12384) but only on `trunk` not on 
the `2.7` branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT commented on pull request #10670: KAFKA-10273 Connect Converters should produce actionable error messages

2021-05-11 Thread GitBox


DuongPTIT commented on pull request #10670:
URL: https://github.com/apache/kafka/pull/10670#issuecomment-838210303


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10669: KAFKA-12769: Backport too 2.8 of KAFKA-8562; SaslChannelBuilder - Avoid (reverse) DNS lookup while bui…

2021-05-11 Thread GitBox


jlprat commented on pull request #10669:
URL: https://github.com/apache/kafka/pull/10669#issuecomment-838186948


   Test failure was:
   `Build / JDK 8 and Scala 2.12 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()`
   This flaky test is already fixed 
(https://issues.apache.org/jira/browse/KAFKA-12384) but only on `trunk` not on 
the `2.8` branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon edited a comment on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-838136488


   @ableegoldman , I've done the code refinement and refactor. Basically is 
what we've discussed in constrained Assignor PR. Please take a look when 
available. 
   cc @guozhangwang , welcome to take another look. 
   Thank you! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-838136488


   @ableegoldman , I've done the code refinement and refactor. Basically is 
what we've discussed in constrained Assignor PR. Please take a look when 
available. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629994499



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionsCount);
+
+if (log.isDebugEnabled()) {
+log.debug("final assignment: {}", currentAssignment);
+}
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions:  sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are 
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+ * @return  the partitions don't assign to any 
current consumers
+ */
+private List getUnassignedPartitions(List 
sortedAllPartitions,
+ List 
sortedAssignedPartitions,
+ Map> topic2AllPotentialConsumers) {
+if (sortedAssignedPartitions.isEmpty()) {
+return sortedAllPartitions;
+}
+
+List unassignedPartitions = new ArrayList<>();
+
+Collections.sort(sortedAssignedPartitions, new 
PartitionComparator(topic2AllPotentialConsumers));
+
+boolean shouldAddDirectly = false;
+Iterator sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
+TopicPartition nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+
+for (TopicPartition topicPartition : sortedAllPartitions) {
+if (shouldAddDirectly || 
!nextAssignedPartition.equals(topicPartition)) {
+unassignedPartitions.add(topicPartition);
+} else {
+// this partition is in assignedPartitions, don't add to 
unassignedPartitions, just get next assigned partition
+if (sortedAssignedPartitionsIter.hasNext()) {
+nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+} else {
+// add the remaining directly since there is no more 
sortedAssignedPartitions
+shouldAddDirectly = true;
+}
+}
+}
+return unassignedPartitions;
+}
+
+/**
+ * get the unassigned partition list by computing the difference set of 
all sorted partitions
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in constrainedAssign method
+ *
+ * To compute the difference set, we use two pointers technique here:
+ *
+ * We loop through the all sorted topics, and then iterate all partitions 
the topic has,
+ * compared with the ith element in sortedAssignedPartitions(i starts from 
0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param totalPartitionsCount  all partitions counts in this 
assignment
+ * @param partitionsPerTopicthe number of partitions for each 
subscribed topic.
+ * @param sortedAssignedPartitions  sorted partitions, all are included in 
the sortedPartitions
+ * @return  the partitions not yet assigned to any 
consumers
+ */
+private List getUnassignedPartitions(int 
totalPartitionsCount,
+ Map 
partitionsPerTopic,
+  

[GitHub] [kafka] jlprat edited a comment on pull request #10651: MINOR: Kafka Streams code samples formating unification

2021-05-11 Thread GitBox


jlprat edited a comment on pull request #10651:
URL: https://github.com/apache/kafka/pull/10651#issuecomment-838123810


   Most of the changes are purely cosmetic:
   - using the right tags for for embedding a code snippet
   - escaping `<` and `>` characters to HTML encoded strings so they are 
properly rendered
   - removing extra indentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629991853



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionsCount);
+
+if (log.isDebugEnabled()) {
+log.debug("final assignment: {}", currentAssignment);
+}
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions:  sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are 
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+ * @return  the partitions don't assign to any 
current consumers
+ */
+private List getUnassignedPartitions(List 
sortedAllPartitions,

Review comment:
   put the 2 `getUnassignedPartitions` (this one and the following one) 
overloading method together for readability




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629991853



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -469,73 +426,184 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionsCount);
+
+if (log.isDebugEnabled()) {
+log.debug("final assignment: {}", currentAssignment);
+}
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions:  sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are 
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+ * @return  the partitions don't assign to any 
current consumers
+ */
+private List getUnassignedPartitions(List 
sortedAllPartitions,

Review comment:
   put the 2 `getUnassignedPartitions` overloading method together for 
readability




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629990802



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -428,14 +372,18 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 for (TopicPartition topicPartition: entry.getValue())
 currentPartitionConsumer.put(topicPartition, entry.getKey());
 
-List sortedPartitions = 
sortPartitions(partition2AllPotentialConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+List sortedAllTopics = new 
ArrayList<>(topic2AllPotentialConsumers.keySet());
+Collections.sort(sortedAllTopics, new 
TopicComparator(topic2AllPotentialConsumers));
+List sortedAllPartitions = 
getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, 
totalPartitionsCount);

Review comment:
   reuse the `getAllTopicPartitions` in `constrainedAssign`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-11 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r629990096



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -106,9 +104,9 @@ private boolean allSubscriptionsEqual(Set allTopics,
 // initialize the subscribed topics set if this is the first 
subscription
 if (subscribedTopics.isEmpty()) {
 subscribedTopics.addAll(subscription.topics());
-} else if (!(subscription.topics().size() == 
subscribedTopics.size()
+} else if (isAllSubscriptionsEqual && 
!(subscription.topics().size() == subscribedTopics.size()
 && subscribedTopics.containsAll(subscription.topics( {
-return false;
+isAllSubscriptionsEqual = false;

Review comment:
   Now, we'll run through all the `subscriptions` since the data 
`consumerToOwnedPartitions` will also passed into `generalAssign`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification

2021-05-11 Thread GitBox


jlprat commented on pull request #10651:
URL: https://github.com/apache/kafka/pull/10651#issuecomment-838123810


   Most of the changes are purely cosmetic:
   - using the right tags for for embedding a code snippet
   - escaping `<` and `>` characters to HTML encoded strings so they are 
properly rendered


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10651: MINOR: Kafka Streams code samples formating unification

2021-05-11 Thread GitBox


jlprat commented on pull request #10651:
URL: https://github.com/apache/kafka/pull/10651#issuecomment-838120807


   @cadonna I'll do it next time. I was doubting between providing a PR per 
file or a PR per folder. (ended up doing PR for the Streams folder).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >