[jira] [Assigned] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-12951:
---

Assignee: Matthias J. Sax

> Infinite loop while restoring a GlobalKTable
> 
>
> Key: KAFKA-12951
> URL: https://issues.apache.org/jira/browse/KAFKA-12951
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damien Gasparina
>Assignee: Matthias J. Sax
>Priority: Major
>
> We encountered an issue a few time in some of our Kafka Streams application.
>  After an unexpected restart of our applications, some instances have not 
> been able to resume operating.
> They got stuck while trying to restore the state store of a GlobalKTable. The 
> only way to resume operating was to manually delete their `state.dir`.
> We observed the following timeline:
>  * After the restart of the Kafka Streams application, it tries to restore 
> its GlobalKTable
>  * It seeks to the last checkpoint available on the state.dir: 382 
> ([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
>  * The watermark ({{endOffset}} results) returned the offset 383 
> {code:java}
> handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
> -1{code}
>  * We enter the loop: 
> [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
>  * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
> [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
>  and we crash (x)
> {code:java}
> Global task did not make progress to restore state within 30 ms.{code}
>  * The POD restart, and we encounter the same issue until we manually delete 
> the {{state.dir}}
>  
> Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
>  * {{Offset 381}} - Last business message received
>  * {{Offset 382}} - Txn COMMIT (last message)
> I think the real culprit is that the checkpoint is {{383}} instead of being 
> {{382}}. For information, this is a compacted topic, and just before the 
> outage, we encountered some ISR shrinking and leader changes.
> While experimenting with the API, it seems that the {{consumer.position()}} 
> call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
> {{position()}} is actually returning the seek position. After the {{poll()}} 
> call, even if no data is returned, the {{position()}} is returning the LSO. I 
> did an example on 
> [https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF edited a comment on pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#issuecomment-861696913


   @ijuma @jsancio  Thanks for your comments and review. Updated the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#issuecomment-862046292


   @dajac Thanks again for comments and support! :)
   Updated the PR. I think I improved the unit test. Revert some changes and 
keep the unit test isolated from the others, but trying to keep the same logic. 
 Tried to step on all the possible scenarios:
   
   - a) Assigned topic partition with no offset data at all  -> Response: None
   - b) Assigned topic partition with a valid OffsetAndMetadata -> Response: 
Long
   - c) Assigned topic partition with an invalidad OffsetAndMetadata (null 
case) -> Response: None
   - d) Unassigned topic partition with no offset data at all -> Response: 
nothing, not on the resulting List
   - e) Unassigned topic partition with a valid OffsetAndMetadata -> Response: 
Long
   - f) Unassigned topic partition with an invalidad OffsetAndMetadata (null 
case) -> Response: None
   
   Also modified a little bit the getPartitionOffsets to adapt it to being able 
to call directly to collectConsumerAssignment, as suggested.
   
   **PD:** As I posted on the comments earlier I encountered that the sibling 
test, _testAdminRequestsForDescribeOffsets_, lacks the validation for assigned 
topic partitions (it is currently only doing the validation against unassigned 
topic partitions, basically because there is not an topic's assigment to the 
testing consumer group at its initialization). Solved at the new test 
_testAdminRequestsForDescribeNegativeOffsets_, and I think that I could 
complement the former one. 
   
   What do you think? Is it worth to open a new PR to approach that separatly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652355524



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -82,20 +87,23 @@
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
 private WindowStore otherWindowStore;
-private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
 private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
-@SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
-metrics = (StreamsMetricsImpl) context.metrics();
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 otherWindowStore = context.getStateStore(otherWindowName);
 
-if 
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+if (enableSpuriousResultFix

Review comment:
   After update the code with the `else` a few tests started to fail. The 
issue is, that for left/outer join we _always_ set the store name (even if the 
feature is disabled...) -- only for inner join, we get an `Optinal.empty()`. 
Thus, we can actually not verify the `else` case (ie, we added a store even if 
we don't need it) at runtime. I guess we need to rely on the added unit tests 
instead to cover this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10733: KAFKA-12816 Added tiered storage related configs including remote log manager configs.

2021-06-15 Thread GitBox


satishd commented on pull request #10733:
URL: https://github.com/apache/kafka/pull/10733#issuecomment-862041961


   Thanks @junrao for the review. Addressed them with the commit 
69aa4f03a266d2d95538df4bfd4eaa8b334db518. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

2021-06-15 Thread GitBox


showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652329369



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -123,24 +123,23 @@ public void start() {
 }
 
 private void start(int[] brokerPorts, String[] logDirs) {
-brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), 
zKConnectString());
+brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), 
"localhost");
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 
brokers.length);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
 
-Object listenerConfig = 
brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+Object listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
 if (listenerConfig != null) {
 listenerName = new ListenerName(listenerConfig.toString());
 }
 
 for (int i = 0; i < brokers.length; i++) {
-brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
 currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : 
currentBrokerLogDirs[i];
-brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), 
currentBrokerLogDirs[i]);
-brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+brokerConfig.put(KafkaConfig.LogDirProp(), 
currentBrokerLogDirs[i]);
+putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), 
"PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
   This method is what I'm referring to:
   ```
   /**
* Starts the Kafka cluster alone using the ports that were assigned 
during initialization of
* the harness.
*
* @throws ConnectException if a directory to store the data cannot be 
created
*/
   public void startOnlyKafkaOnSamePorts() {
   start(currentBrokerPorts, currentBrokerLogDirs);
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

2021-06-15 Thread GitBox


showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652329162



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -123,24 +123,23 @@ public void start() {
 }
 
 private void start(int[] brokerPorts, String[] logDirs) {
-brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), 
zKConnectString());
+brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), 
"localhost");
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 
brokers.length);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
 
-Object listenerConfig = 
brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+Object listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
 if (listenerConfig != null) {
 listenerName = new ListenerName(listenerConfig.toString());
 }
 
 for (int i = 0; i < brokers.length; i++) {
-brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
 currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : 
currentBrokerLogDirs[i];
-brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), 
currentBrokerLogDirs[i]);
-brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+brokerConfig.put(KafkaConfig.LogDirProp(), 
currentBrokerLogDirs[i]);
+putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), 
"PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
   
`connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
 ` it is. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

2021-06-15 Thread GitBox


ijuma commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652327781



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -123,24 +123,23 @@ public void start() {
 }
 
 private void start(int[] brokerPorts, String[] logDirs) {
-brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), 
zKConnectString());
+brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), 
"localhost");
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 
brokers.length);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
 
-Object listenerConfig = 
brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+Object listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
 if (listenerConfig != null) {
 listenerName = new ListenerName(listenerConfig.toString());
 }
 
 for (int i = 0; i < brokers.length; i++) {
-brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
 currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : 
currentBrokerLogDirs[i];
-brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), 
currentBrokerLogDirs[i]);
-brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+brokerConfig.put(KafkaConfig.LogDirProp(), 
currentBrokerLogDirs[i]);
+putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), 
"PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
   Thanks for investigating this. The line links don't work for me. Which 
files are the lines referring to?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

2021-06-15 Thread GitBox


showuon commented on a change in pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#discussion_r652324325



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
##
@@ -123,24 +123,23 @@ public void start() {
 }
 
 private void start(int[] brokerPorts, String[] logDirs) {
-brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), 
zKConnectString());
+brokerConfig.put(KafkaConfig.ZkConnectProp(), zKConnectString());
 
-putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), 
"localhost");
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 
brokers.length);
-putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+putIfAbsent(brokerConfig, KafkaConfig.DeleteTopicEnableProp(), true);
+putIfAbsent(brokerConfig, 
KafkaConfig.GroupInitialRebalanceDelayMsProp(), 0);
+putIfAbsent(brokerConfig, 
KafkaConfig.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+putIfAbsent(brokerConfig, KafkaConfig.AutoCreateTopicsEnableProp(), 
false);
 
-Object listenerConfig = 
brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+Object listenerConfig = 
brokerConfig.get(KafkaConfig.InterBrokerListenerNameProp());
 if (listenerConfig != null) {
 listenerName = new ListenerName(listenerConfig.toString());
 }
 
 for (int i = 0; i < brokers.length; i++) {
-brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+brokerConfig.put(KafkaConfig.BrokerIdProp(), i);
 currentBrokerLogDirs[i] = logDirs[i] == null ? createLogDir() : 
currentBrokerLogDirs[i];
-brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), 
currentBrokerLogDirs[i]);
-brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), brokerPorts[i]);
+brokerConfig.put(KafkaConfig.LogDirProp(), 
currentBrokerLogDirs[i]);
+putIfAbsent(brokerConfig, KafkaConfig.ListenersProp(), 
"PLAINTEXT://localhost:" + brokerPorts[i]);

Review comment:
   @ijuma , I found why the test `testBrokerCoordinator` failed. It's 
because we use `putIfAbsent` here, but in the `testBrokerCoordinator`, we want 
to bind previous allocated port [line 
114](https://github.com/apache/kafka/pull/10872/files#diff-234389ba52863064119a9fbb8d1649d6a039a28790b7c186357e60570b0af049L114).
 If we use `putIfAbsent` here, it'll use default port `0` to have a random 
assigned port as [line 
120](https://github.com/apache/kafka/pull/10872/files#diff-234389ba52863064119a9fbb8d1649d6a039a28790b7c186357e60570b0af049R120).
 So, we should directly `put` property here to fix it. FYI




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10588: KAFKA-12662: add unit test for ProducerPerformance

2021-06-15 Thread GitBox


chia7712 commented on a change in pull request #10588:
URL: https://github.com/apache/kafka/pull/10588#discussion_r652315860



##
File path: 
tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
##
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class ProducerPerformanceTest {
+
+@Mock
+KafkaProducer producerMock;
+
+@Spy
+ProducerPerformance producerPerformanceSpy;
+
+private File createTempFile(String contents) throws IOException {
+File file = File.createTempFile("ProducerPerformanceTest", ".tmp");
+file.deleteOnExit();
+Files.write(file.toPath(), contents.getBytes());
+return file;
+}
+
+@Test
+public void testReadPayloadFile() throws Exception {
+File payloadFile = createTempFile("Hello\nKafka");
+String payloadFilePath = payloadFile.getAbsolutePath();
+String payloadDelimiter = "\n";
+
+List payloadByteList = 
ProducerPerformance.readPayloadFile(payloadFilePath, payloadDelimiter);
+
+assertEquals(2, payloadByteList.size());
+assertEquals("Hello", new String(payloadByteList.get(0)));
+assertEquals("Kafka", new String(payloadByteList.get(1)));
+}
+
+@Test
+public void testReadProps() throws Exception {
+
+List producerProps = 
Collections.singletonList("bootstrap.servers=localhost:9000");
+String producerConfig = createTempFile("acks=1").getAbsolutePath();
+String transactionalId = "1234";
+boolean transactionsEnabled = true;
+
+Properties prop = ProducerPerformance.readProps(producerProps, 
producerConfig, transactionalId, transactionsEnabled);
+
+assertNotNull(prop);
+assertEquals(5, prop.size());
+}
+
+@Test
+public void testNumberOfCallsForSendAndClose() throws IOException {
+
+doReturn(null).when(producerMock).send(any(), any());
+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+
+String[] args = new String[] {"--topic", "Hello-Kafka", 
"--num-records", "5", "--throughput", "100", "--record-size", "100", 
"--producer-props", "bootstrap.servers=localhost:9000"};
+producerPerformanceSpy.start(args);
+verify(producerMock, times(5)).send(any(), any());
+verify(producerMock, times(1)).close();
+}
+
+@Test
+public void testUnexpectedArg() {
+
+String[] args = new String[] {"--test", "test", "--topic", 
"Hello-Kafka", "--num-records", "5", "--throughput", "100", "--record-size", 
"100", "--producer-props", "bootstrap.servers=localhost:9000"};
+ArgumentParser parser = ProducerPerformance.argParser();
+ArgumentParserException thrown = 
assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
+assertEquals("unrecognized arguments: '--test'", thrown.getMessage());
+}
+
+@Test
+public void testGenerateRandomPayloadByPayloadFile() {
+
+Integer recordSize = null;
+String inputString = 

[GitHub] [kafka] jsancio closed pull request #10812: KAFKA-12863: Configure controller snapshot generation

2021-06-15 Thread GitBox


jsancio closed pull request #10812:
URL: https://github.com/apache/kafka/pull/10812


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10868: MINOR: make sure alterAclsPurgatory is closed when controller server …

2021-06-15 Thread GitBox


chia7712 commented on a change in pull request #10868:
URL: https://github.com/apache/kafka/pull/10868#discussion_r652311129



##
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##
@@ -200,6 +200,8 @@ class ControllerServer(
 CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this)
   if (quotaManagers != null)
 CoreUtils.swallow(quotaManagers.shutdown(), this)
+  if (controllerApis != null)
+CoreUtils.swallow(controllerApis.close(), this)

Review comment:
   copy that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10898) Support snakeCaseName/camelCaseName JSON field name in JsonConverterGenerator

2021-06-15 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10898.

Resolution: Won't Fix

> Support snakeCaseName/camelCaseName JSON field name in JsonConverterGenerator
> -
>
> Key: KAFKA-10898
> URL: https://issues.apache.org/jira/browse/KAFKA-10898
> Project: Kafka
>  Issue Type: Improvement
>  Components: protocol
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> I find many JSON-format command line params for example 
> `kafka-reassign-partitions.sh --reassignment-json-file my.json` which we can 
> use JsonConverter to read and write.
> However, currently, we use camelCaseName when converting protocol data to 
> JSON, but most JSON-format command line params use snakeCaseName, so we 
> should support snakeCaseName in JsonConverterGenerator.
> In the post-KIP-500 world, we will move all configs in zookeeper to 
> kafka-raft, and the data in zookeeper is also stored as snakeCaseName JSON, 
> so it's useful to support snakeCaseName in JsonConverterGenerator in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #9809: KAFKA-10898: Support snakeCaseName in JsonConverterGenerator

2021-06-15 Thread GitBox


dengziming commented on pull request #9809:
URL: https://github.com/apache/kafka/pull/9809#issuecomment-861995774


   see comment https://issues.apache.org/jira/browse/KAFKA-10898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming closed pull request #9809: KAFKA-10898: Support snakeCaseName in JsonConverterGenerator

2021-06-15 Thread GitBox


dengziming closed pull request #9809:
URL: https://github.com/apache/kafka/pull/9809


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10898) Support snakeCaseName/camelCaseName JSON field name in JsonConverterGenerator

2021-06-15 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364026#comment-17364026
 ] 

dengziming commented on KAFKA-10898:


Thank you [~cmccabe], I firstly want this to optimize some JSON-related code, 
such as the parameter file of kafka-reassign-partitions.sh, but I think this is 
unnecessary, I will close this issue and the PR. If we wanted it in the future, 
we can reopen it then.

> Support snakeCaseName/camelCaseName JSON field name in JsonConverterGenerator
> -
>
> Key: KAFKA-10898
> URL: https://issues.apache.org/jira/browse/KAFKA-10898
> Project: Kafka
>  Issue Type: Improvement
>  Components: protocol
>Reporter: dengziming
>Assignee: dengziming
>Priority: Minor
>
> I find many JSON-format command line params for example 
> `kafka-reassign-partitions.sh --reassignment-json-file my.json` which we can 
> use JsonConverter to read and write.
> However, currently, we use camelCaseName when converting protocol data to 
> JSON, but most JSON-format command line params use snakeCaseName, so we 
> should support snakeCaseName in JsonConverterGenerator.
> In the post-KIP-500 world, we will move all configs in zookeeper to 
> kafka-raft, and the data in zookeeper is also stored as snakeCaseName JSON, 
> so it's useful to support snakeCaseName in JsonConverterGenerator in advance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on a change in pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-06-15 Thread GitBox


dengziming commented on a change in pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#discussion_r652304456



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
##
@@ -45,11 +50,26 @@ public void test() {
 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
 .build();
-store.init(context, store);
-context.register(store, null);
+store.init(context.getStateStoreContext(), store);
+context.getStateStoreContext().register(store, null);
 }
 final Transformer> 
transformer = supplier.get();
-transformer.init(context);
+transformer.init(new 
org.apache.kafka.streams.processor.MockProcessorContext() {
+@Override
+public  S getStateStore(final String name) {
+return context.getStateStore(name);
+}
+
+@Override
+public  void forward(final K key, final V value) {
+context.forward(new Record<>(key.toString(), value.toString(), 
0L));
+}
+
+@Override
+public Cancellable schedule(final Duration interval, final 
PunctuationType type, final Punctuator callback) throws 
IllegalArgumentException {

Review comment:
   Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-06-15 Thread GitBox


dengziming commented on a change in pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#discussion_r652297960



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
##
@@ -45,11 +50,26 @@ public void test() {
 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
 .build();
-store.init(context, store);
-context.register(store, null);
+store.init(context.getStateStoreContext(), store);
+context.getStateStoreContext().register(store, null);
 }
 final Transformer> 
transformer = supplier.get();
-transformer.init(context);
+transformer.init(new 
org.apache.kafka.streams.processor.MockProcessorContext() {
+@Override
+public  S getStateStore(final String name) {
+return context.getStateStore(name);
+}
+
+@Override
+public  void forward(final K key, final V value) {
+context.forward(new Record<>(key.toString(), value.toString(), 
0L));

Review comment:
   Yes, both K and V are types of String here, we should use cast here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] XT3RM1N8R commented on pull request #5089: KAFKA-6420: Adding Msys support

2021-06-15 Thread GitBox


XT3RM1N8R commented on pull request #5089:
URL: https://github.com/apache/kafka/pull/5089#issuecomment-861960213


   It has been almost 3 years.
   Would it be bad form if I were to submit my own very similar PR to revive 
this effort?
   I would like to help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iamgd67 commented on pull request #10818: KAFKA-12889: log clean relative index range check of group consider empty log segment to avoid too many empty log segment left

2021-06-15 Thread GitBox


iamgd67 commented on pull request #10818:
URL: https://github.com/apache/kafka/pull/10818#issuecomment-861945165


   @ijuma could you please review this pr, it has been several days since 
created, don't know what to continue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r65557



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) 
throws IllegalArgume
 public JoinWindows before(final Duration timeDifference) throws 
IllegalArgumentException {
 final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
 final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefix);
-return new JoinWindows(timeDifferenceMs, afterMs, 
DEFAULT_GRACE_PERIOD_MS);
+return new JoinWindows(timeDifferenceMs, afterMs, graceMs, 
enableSpuriousResultFix);

Review comment:
   Sure. Don't see the connection to this change thought? Note, that 
`before()` and `after()` are non-static methods, and thus, they should not 
change/set the grace period. Only the static `of(size)` and non-static 
`grace()` should change grace period.
   ```
   JoinWindow.of(5000).before(30); // of() will set default of 24h, so no need 
for before() to reset to 24h, it can just inherit it
   JoinWindows.of(5000).grace(40).before(30); // this should leave grace at 
`40` however, without this fix `before()` would re-set grace to the default of 
24h what is incorrect
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #10887: MINOR: Refactor the MetadataCache interface

2021-06-15 Thread GitBox


cmccabe opened a new pull request #10887:
URL: https://github.com/apache/kafka/pull/10887


   Remove getNonExistingTopics, which was not necessary. MetadataCache
   already lets callers check for the existence of topics by calling
   MetadataCache#contains.
   
   Add MetadataCache#getAliveBrokerNode and getAliveBrokerNodes.  This
   simplifies the calling code, which always wants a Node.
   
   Add ScalaDoc for MetadataCache#numPartitions.
   
   MetadataCache#numPartitions should return a simple int rather than an
   Option[Int]. A topic which exists can never have 0 partitions, so a
   return of 0 is not ambiguous.
   
   Fix a case where we were calling getAliveBrokers and filtering by id,
   rather than simply calling getAliveBroker(id) and making use of the hash
   map.
   
   Put ZkMetadataCache into its own file, like the other MetadataCache
   subclass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-15 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r652250666



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -71,6 +76,12 @@ public FetchResponseData data() {
 return data;
 }
 
+/**
+ * From version 3 or later, the authorized and existing entries in 
`FetchRequest.fetchData` should be in the same order in `responseData`.
+ * Version 13 introduces topic IDs which mean there may be unresolved 
partitions. If there is any unknown topic ID in the request, the
+ * response will contain a top-level UNKNOWN_TOPIC_ID error and 
UNKNOWN_TOPIC_ID errors on all the partitions.
+ * We may also return UNKNOWN_TOPIC_ID for a given partition when that 
partition in the session has a topic ID inconsistent with the broker.

Review comment:
   I was just thinking about this and realized we may send new error types 
to clients that may not be able to handle them. I need to review this code 
again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652249976



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -110,6 +132,12 @@ class ConsumerGroupServiceTest {
 AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
   }
 
+  private def listGroupNegativeOffsetsResult: ListConsumerGroupOffsetsResult = 
{
+// Half of the partitions of the testing topics are set to have a negative 
integer offset (null value [KAFKA-9507 for reference])
+val offsets = topicPartitions.zipWithIndex.map{ case (tp, i) => tp -> ( 
if(i % 2 == 0) null else new OffsetAndMetadata(100) ) }.toMap.asJava

Review comment:
   Yes. I am generating explicitly the test cases on the function, like 
something like this (work in progress):
   
   
   ```
 def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
   val args = Array("--bootstrap-server", "localhost:9092", "--group", 
group, "--describe", "--offsets")
   val groupService = consumerGroupService(args)
   
   val testTopicPartition0 = new TopicPartition("testTopic", 0);
   val testTopicPartition1 = new TopicPartition("testTopic", 1);
   val testTopicPartition2 = new TopicPartition("testTopic", 2);
   val testTopicPartition3 = new TopicPartition("testTopic", 3);
   val testTopicPartition4 = new TopicPartition("testTopic", 4);
   val testTopicPartition5 = new TopicPartition("testTopic", 5);
   val testTopicPartition6 = new TopicPartition("testTopic", 6);
   val testTopicPartition7 = new TopicPartition("testTopic", 7);
   val testTopicPartition8 = new TopicPartition("testTopic", 8);
   val testTopicPartition9 = new TopicPartition("testTopic", 9);
   
   val offsets = Map(
 testTopicPartition0 -> new OffsetAndMetadata(100),
 testTopicPartition1 -> new OffsetAndMetadata(100),
 testTopicPartition2 -> new OffsetAndMetadata(100),
 testTopicPartition3 -> new OffsetAndMetadata(100),
 testTopicPartition4 -> new OffsetAndMetadata(100),
 testTopicPartition5 -> new OffsetAndMetadata(100),
 testTopicPartition6 -> new OffsetAndMetadata(100),
 testTopicPartition7 -> new OffsetAndMetadata(100),
 testTopicPartition8 -> new OffsetAndMetadata(100),
 testTopicPartition9 -> null,
   ).asJava
   ```
   Does this makes sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  .thenReturn(listGroupNegativeOffsetsResult)
+when(admin.listOffsets(offsetsArgMatcher, any()))
+  .thenReturn(listOffsetsResult)
+
+val (state, assignments) = groupService.collectGroupOffsets(group)
+assertEquals(Some("Stable"), state)
+assertTrue(assignments.nonEmpty)
+assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
   Yes, i could add that kind of assertions. 
   Regarding the second question, I encounter something interesting that we may 
need to improve on the original test. This is the original function (AS-IS) of 
describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
   val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", null)
   val description = new ConsumerGroupDescription(group,
 true,
 Collections.singleton(member1),
 classOf[RangeAssignor].getName,
 groupState,
 new Node(1, "localhost", 9092))
   new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
 }
 ```
 
As you can see, last parameter of member1 declaration is _null_. That is 
the assignment of the partitions to the consumer. Always is set to null, so no 
partition is assigned to the consumer group. For example, this is the print of 
the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no 
change so far);

(groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, 
groupInstanceId=instance1, clientId=client1, host=host1, 
assignment=(topicPartitions=)), 
partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, 
state=Stable, coordinator=localhost:9092 (id: 1 rack: null), 
authorizedOperations=[])
   
   That is the result of  `val consumerGroups = 
describeConsumerGroups(groupIds)` on ConsumerGroupCommand's 
collectGroupsOffsets function of the regular test. As you can see, there aren't 
assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and 
going directly to _unassignedPartitions_. So the test is **only covering 
unassignedPartitions case.** 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  .thenReturn(listGroupNegativeOffsetsResult)
+when(admin.listOffsets(offsetsArgMatcher, any()))
+  .thenReturn(listOffsetsResult)
+
+val (state, assignments) = groupService.collectGroupOffsets(group)
+assertEquals(Some("Stable"), state)
+assertTrue(assignments.nonEmpty)
+assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
   Yes, i could add that kind of assertions. 
   Regarding the second question, I encounter something interesting that we may 
need to improve on the original test. This is the original function (AS-IS) of 
describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
   val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", null)
   val description = new ConsumerGroupDescription(group,
 true,
 Collections.singleton(member1),
 classOf[RangeAssignor].getName,
 groupState,
 new Node(1, "localhost", 9092))
   new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
 }
 ```
 
As you can see, last parameter of member1 declaration is _null_. That is 
the assignment of the partitions to the consumer. Always is set to null, so no 
partition is assigned to the consumer group. For example, this is the print of 
the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no 
change so far);

(groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, 
groupInstanceId=instance1, clientId=client1, host=host1, 
assignment=(topicPartitions=)), 
partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, 
state=Stable, coordinator=localhost:9092 (id: 1 rack: null), 
authorizedOperations=[])
   
   That is the result of  `val consumerGroups = 
describeConsumerGroups(groupIds)` on ConsumerGroupCommand's 
collectGroupsOffsets function of the regular test. As you can see, there aren't 
assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and 
going directly to _unassignedPartitions_. So the test is only testing 
unassignedPartitions. 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
 verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+val groupService = consumerGroupService(args)
+
+
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+  .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+  .thenReturn(listGroupNegativeOffsetsResult)
+when(admin.listOffsets(offsetsArgMatcher, any()))
+  .thenReturn(listOffsetsResult)
+
+val (state, assignments) = groupService.collectGroupOffsets(group)
+assertEquals(Some("Stable"), state)
+assertTrue(assignments.nonEmpty)
+assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
   Yes, i could add that kind of assertions. 
   Encounter something interesting that we may need to improve on the original 
test. This is the original function (AS-IS) of describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
   val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", null)
   val description = new ConsumerGroupDescription(group,
 true,
 Collections.singleton(member1),
 classOf[RangeAssignor].getName,
 groupState,
 new Node(1, "localhost", 9092))
   new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
 }
 ```
 
As you can see, last parameter of member1 declaration is _null_. That is 
the assignment of the partitions to the consumer. Always is set to null, so no 
partition is assigned to the consumer group. For example, this is the print of 
the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no 
change so far);

(groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, 
groupInstanceId=instance1, clientId=client1, host=host1, 
assignment=(topicPartitions=)), 
partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, 
state=Stable, coordinator=localhost:9092 (id: 1 rack: null), 
authorizedOperations=[])
   
   That is the result of  `val consumerGroups = 
describeConsumerGroups(groupIds)` on ConsumerGroupCommand's 
collectGroupsOffsets function of the regular test. As you can see, there aren't 
assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and 
going directly to _unassignedPartitions_. So the test is only testing 
unassignedPartitions. 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe closed pull request #10857: MINOR: Create SnapshotWriter and SnapshotReader interfaces

2021-06-15 Thread GitBox


cmccabe closed pull request #10857:
URL: https://github.com/apache/kafka/pull/10857


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10857: MINOR: Create SnapshotWriter and SnapshotReader interfaces

2021-06-15 Thread GitBox


cmccabe commented on pull request #10857:
URL: https://github.com/apache/kafka/pull/10857#issuecomment-861903000


   This was superseded by #10786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-06-15 Thread GitBox


mjsax commented on pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#issuecomment-861902968


   Thanks for clarifying. Left a few minor comments. Overall LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10867: KAFKA-12931: KIP-746: Revise KRaft Metadata Records

2021-06-15 Thread GitBox


cmccabe merged pull request #10867:
URL: https://github.com/apache/kafka/pull/10867


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-06-15 Thread GitBox


mjsax commented on a change in pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#discussion_r652238876



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
##
@@ -45,11 +50,26 @@ public void test() {
 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
 .build();
-store.init(context, store);
-context.register(store, null);
+store.init(context.getStateStoreContext(), store);
+context.getStateStoreContext().register(store, null);
 }
 final Transformer> 
transformer = supplier.get();
-transformer.init(context);
+transformer.init(new 
org.apache.kafka.streams.processor.MockProcessorContext() {
+@Override
+public  S getStateStore(final String name) {
+return context.getStateStore(name);
+}
+
+@Override
+public  void forward(final K key, final V value) {
+context.forward(new Record<>(key.toString(), value.toString(), 
0L));
+}
+
+@Override
+public Cancellable schedule(final Duration interval, final 
PunctuationType type, final Punctuator callback) throws 
IllegalArgumentException {

Review comment:
   nit: remove `throws` declaration (not needed).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-06-15 Thread GitBox


mjsax commented on a change in pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#discussion_r652238393



##
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerTest.java
##
@@ -45,11 +50,26 @@ public void test() {
 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
 .build();
-store.init(context, store);
-context.register(store, null);
+store.init(context.getStateStoreContext(), store);
+context.getStateStoreContext().register(store, null);
 }
 final Transformer> 
transformer = supplier.get();
-transformer.init(context);
+transformer.init(new 
org.apache.kafka.streams.processor.MockProcessorContext() {
+@Override
+public  S getStateStore(final String name) {
+return context.getStateStore(name);
+}
+
+@Override
+public  void forward(final K key, final V value) {
+context.forward(new Record<>(key.toString(), value.toString(), 
0L));

Review comment:
   Why `toString()` -- should we not assume that both `K` and `V` are of 
type `String`? I guess we cannot declare the input parameters as `String`, but 
wondering if we should rather cast here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-15 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r652228425



##
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##
@@ -294,8 +296,9 @@ class ReplicaFetcherThread(name: String,
 val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && 
fetchData.toForget.isEmpty) {
   None
 } else {
+  val version: Short = if (fetchRequestVersion >= 13 && 
!fetchData.canUseTopicIds) 12 else fetchRequestVersion

Review comment:
   This one is slightly different as we are checking the IBP to get 
fetchRequestVersion. We could have an IBP where the version is lower than 12.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


mjsax commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r65557



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) 
throws IllegalArgume
 public JoinWindows before(final Duration timeDifference) throws 
IllegalArgumentException {
 final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
 final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefix);
-return new JoinWindows(timeDifferenceMs, afterMs, 
DEFAULT_GRACE_PERIOD_MS);
+return new JoinWindows(timeDifferenceMs, afterMs, graceMs, 
enableSpuriousResultFix);

Review comment:
   Sure. Don't see the connection to this change thought? Note, that 
`before()` and `after()` are non-static method, and thus, they should not 
change/set the grace period. Only the static `of(size)` and non-static 
`grace()` should change grace period.
   ```
   JoinWindow.of(5000).before(30); // of() will set default of 24h, so no need 
for before() to reset to 24h, it can just inherit it
   JoinWindows.of(5000).grace(40).before(30); // this should leave grace at 
`40` however, without this fix `before()` would re-set grace to the default of 
24h what is incorrect
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10804: KAFKA-12877: Make flexibleVersions mandatory

2021-06-15 Thread GitBox


cmccabe merged pull request #10804:
URL: https://github.com/apache/kafka/pull/10804


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#issuecomment-861883437


   @abbccdda Thanks for reviewing. I pushed an update and responded to a few 
comments. I think I remember why I wanted to upgrade argparse4j. Using 0.7.1, 
the --help argument displays the following:
   ```
   This tool is used to analyze the transactional state  of  producers in the 
cluster. It can be used to
   detect and recover from hanging transactions.
   
   optional arguments:
 -h, --help show this help message and exit
 -v, --version  show the version of this Kafka distribution and exit
 --command-config FILE  property file containing configs to be passed to 
admin client
 --bootstrap-server host:port
hostname and port for the  broker  to  connect  to, 
 in the form `host:port`
(multiple comma-separated entries can be given)
   
   commands:
 COMMAND
   list list transactions
   describe describe the state of an active transactional-id
   describe-producers   describe the states of active producers for a topic 
partition
   abortabort a hanging transaction (requires 
administrative privileges)
   ```
   
   Note that --bootstrap-servers is listed as an optional argument event though 
it is actually required. In 0.8.1, the help output calls these "named 
arguments" instead which is less likely to confuse. Anyway, we can still do 
this upgrade separately since it is a minor issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


guozhangwang commented on pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#issuecomment-861886242


   Re-triggered unit tests. We can merge after it turns green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652199342



##
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public abstract class TransactionsCommand {
+private static final Logger log = 
LoggerFactory.getLogger(TransactionsCommand.class);
+
+protected final Time time;
+
+protected TransactionsCommand(Time time) {
+this.time = time;
+}
+
+/**
+ * Get the name of this command (e.g. `describe-producers`).
+ */
+abstract String name();
+
+/**
+ * Specify the arguments needed for this command.
+ */
+abstract void addSubparser(Subparsers subparsers);
+
+/**
+ * Execute the command logic.
+ */
+abstract void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception;
+
+
+static class AbortTransactionCommand extends TransactionsCommand {
+
+AbortTransactionCommand(Time time) {
+super(time);
+}
+
+@Override
+String name() {
+return "abort";
+}
+
+@Override
+void addSubparser(Subparsers subparsers) {
+Subparser subparser = subparsers.addParser(name())
+.help("abort a hanging transaction (requires administrative 
privileges)");
+
+subparser.addArgument("--topic")
+.help("topic name")
+.action(store())
+.type(String.class)
+.required(true);
+
+subparser.addArgument("--partition")
+.help("partition number")
+.action(store())
+.type(Integer.class)
+.required(true);
+
+ArgumentGroup newBrokerArgumentGroup = 
subparser.addArgumentGroup("new brokers")
+.description("For newer brokers, you must provide the start 
offset of the transaction " +
+"to be aborted");
+
+newBrokerArgumentGroup.addArgument("--start-offset")
+.help("start offset of the transaction to abort")
+.action(store())
+.type(Long.class);
+
+ArgumentGroup olderBrokerArgumentGroup = 
subparser.addArgumentGroup("older brokers")
+.description("For older brokers, you must provide all of these 
arguments");
+
+olderBrokerArgumentGroup.addArgument("--producer-id")
+

[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652197234



##
File path: gradle/dependencies.gradle
##
@@ -57,7 +57,7 @@ versions += [
   activation: "1.1.1",
   apacheda: "1.0.2",
   apacheds: "2.0.0-M24",
-  argparse4j: "0.7.0",
+  argparse4j: "0.8.1",

Review comment:
   I couldn't come up with a reason this had to be done here, so I reverted 
this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652195208



##
File path: 
tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.AbortTransactionResult;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import 
org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
+import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTransactionsResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionsCommandTest {
+
+private final MockExitProcedure exitProcedure = new MockExitProcedure();
+private final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
+private final PrintStream out = new PrintStream(outputStream);
+private final MockTime time = new MockTime();
+private final Admin admin = Mockito.mock(Admin.class);
+
+@BeforeEach
+public void setupExitProcedure() {
+Exit.setExitProcedure(exitProcedure);
+}
+
+@AfterEach
+public void resetExitProcedure() {
+Exit.resetExitProcedure();
+}
+
+@Test
+public void testDescribeProducersTopicRequired() throws Exception {
+assertCommandFailure(new String[]{
+"--bootstrap-server",
+"localhost:9092",
+"describe-producers",
+"--partition",
+"0"
+});
+}
+
+@Test
+public void testDescribeProducersPartitionRequired() throws Exception {
+assertCommandFailure(new String[]{
+"--bootstrap-server",
+"localhost:9092",
+"describe-producers",
+"--topic",
+"foo"
+});
+}
+
+@Test
+public void testDescribeProducersLeader() throws Exception {
+TopicPartition topicPartition = new TopicPartition("foo", 5);
+String[] args = new String[] {
+"--bootstrap-server",
+"localhost:9092",
+"describe-producers",
+"--topic",
+topicPartition.topic(),
+"--partition",
+String.valueOf(topicPartition.partition())
+};
+
+testDescribeProducers(topicPartition, args, new 
DescribeProducersOptions());
+}
+
+@Test
+public void testDescribeProducersSpecificReplica() throws Exception {
+TopicPartition topicPartition = new TopicPartition("foo", 5);
+int brokerId = 5;
+
+String[] args = new String[] {
+

[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652193627



##
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public abstract class TransactionsCommand {
+private static final Logger log = 
LoggerFactory.getLogger(TransactionsCommand.class);
+
+protected final Time time;
+
+protected TransactionsCommand(Time time) {
+this.time = time;
+}
+
+/**
+ * Get the name of this command (e.g. `describe-producers`).
+ */
+abstract String name();
+
+/**
+ * Specify the arguments needed for this command.
+ */
+abstract void addSubparser(Subparsers subparsers);
+
+/**
+ * Execute the command logic.
+ */
+abstract void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception;
+
+
+static class AbortTransactionCommand extends TransactionsCommand {
+
+AbortTransactionCommand(Time time) {
+super(time);
+}
+
+@Override
+String name() {
+return "abort";
+}
+
+@Override
+void addSubparser(Subparsers subparsers) {
+Subparser subparser = subparsers.addParser(name())
+.help("abort a hanging transaction (requires administrative 
privileges)");
+
+subparser.addArgument("--topic")
+.help("topic name")
+.action(store())
+.type(String.class)
+.required(true);
+
+subparser.addArgument("--partition")
+.help("partition number")
+.action(store())
+.type(Integer.class)
+.required(true);
+
+ArgumentGroup newBrokerArgumentGroup = 
subparser.addArgumentGroup("new brokers")
+.description("For newer brokers, you must provide the start 
offset of the transaction " +
+"to be aborted");
+
+newBrokerArgumentGroup.addArgument("--start-offset")
+.help("start offset of the transaction to abort")
+.action(store())
+.type(Long.class);
+
+ArgumentGroup olderBrokerArgumentGroup = 
subparser.addArgumentGroup("older brokers")
+.description("For older brokers, you must provide all of these 
arguments");
+
+olderBrokerArgumentGroup.addArgument("--producer-id")
+

[GitHub] [kafka] dajac commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-15 Thread GitBox


dajac commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652190661



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
+}
+  }
+}
+  }
+
+  private def removePendingSyncMember(
+group: GroupMetadata,
+memberId: String
+  ): Unit = {
+group.removePendingSyncMember(memberId)
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+group.clearPendingSyncMembers()
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+group: GroupMetadata
+  ): Unit = {
+val delayedSync = new DelayedSync(this, group, group.generationId, 
group.rebalanceTimeoutMs)
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+group: GroupMetadata,
+generationId: Int,
+forceComplete: () => Boolean
+  ): Boolean = {
+group.inLock {
+  if (generationId != group.generationId) {
+forceComplete()
+  } else {
+group.currentState match {
+  case Dead | Empty | PreparingRebalance =>
+forceComplete()
+  case CompletingRebalance | Stable =>
+if (group.hasReceivedSyncFromAllMembers())
+  forceComplete()
+else false
+}
+  }
+}
+  }
+
+  def onExpirePendingSync(
+group: GroupMetadata,
+generationId: Int
+  ): Unit = {
+group.inLock {
+  if (generationId != group.generationId) {
+debug(s"Received unexpected notification of sync expiration for 
${group.groupId} " +
+  s" with an old generation $generationId.")
+  } else {
+group.currentState match {
+  case Dead | Empty | PreparingRebalance =>
+debug(s"Received unexpected notification of sync expiration after 
group ${group.groupId} " +
+  s"already transitioned to the ${group.currentState} state.")
+
+  case CompletingRebalance | Stable =>
+if (!group.hasAllMembersJoined) {

Review comment:
   Oops... It is definitely not. I introduced an predicate method in the 
group for this but use the wrong one here. My bad...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652190611



##
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public abstract class TransactionsCommand {
+private static final Logger log = 
LoggerFactory.getLogger(TransactionsCommand.class);
+
+protected final Time time;
+
+protected TransactionsCommand(Time time) {
+this.time = time;
+}
+
+/**
+ * Get the name of this command (e.g. `describe-producers`).
+ */
+abstract String name();
+
+/**
+ * Specify the arguments needed for this command.
+ */
+abstract void addSubparser(Subparsers subparsers);
+
+/**
+ * Execute the command logic.
+ */
+abstract void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception;
+
+
+static class AbortTransactionCommand extends TransactionsCommand {
+
+AbortTransactionCommand(Time time) {
+super(time);
+}
+
+@Override
+String name() {
+return "abort";
+}
+
+@Override
+void addSubparser(Subparsers subparsers) {
+Subparser subparser = subparsers.addParser(name())
+.help("abort a hanging transaction (requires administrative 
privileges)");
+
+subparser.addArgument("--topic")
+.help("topic name")
+.action(store())
+.type(String.class)
+.required(true);
+
+subparser.addArgument("--partition")
+.help("partition number")
+.action(store())
+.type(Integer.class)
+.required(true);
+
+ArgumentGroup newBrokerArgumentGroup = 
subparser.addArgumentGroup("new brokers")
+.description("For newer brokers, you must provide the start 
offset of the transaction " +
+"to be aborted");
+
+newBrokerArgumentGroup.addArgument("--start-offset")
+.help("start offset of the transaction to abort")
+.action(store())
+.type(Long.class);
+
+ArgumentGroup olderBrokerArgumentGroup = 
subparser.addArgumentGroup("older brokers")
+.description("For older brokers, you must provide all of these 
arguments");
+
+olderBrokerArgumentGroup.addArgument("--producer-id")
+

[GitHub] [kafka] dajac commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-15 Thread GitBox


dajac commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652190153



##
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is 
completing the rebalance.
+ *
+ * Whenever a SyncGroup is received, checks that we received all the SyncGroup 
request from
+ * each member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a 
SyncGroup requests
+ * are removed from the group. If any members is removed, the group is 
rebalanced.
+ */
+private[group] class DelayedSync(

Review comment:
   Ah.. I was not aware of this. This means that I cannot change the 
purgatory name like I did.
   
   I think that it is useful to have some metrics. The issue is that we would 
need a KIP, right?
   
   As an alternative, we could perhaps share the purgatory for the delayed join 
and the delayed sync. We could use different keys for instance. What do you 
think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-15 Thread GitBox


dajac commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652188355



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
+}
+  }
+}
+  }
+
+  private def removePendingSyncMember(
+group: GroupMetadata,
+memberId: String
+  ): Unit = {
+group.removePendingSyncMember(memberId)
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+group.clearPendingSyncMembers()
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+group: GroupMetadata
+  ): Unit = {
+val delayedSync = new DelayedSync(this, group, group.generationId, 
group.rebalanceTimeoutMs)
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+group: GroupMetadata,
+generationId: Int,
+forceComplete: () => Boolean
+  ): Boolean = {
+group.inLock {
+  if (generationId != group.generationId) {
+forceComplete()
+  } else {
+group.currentState match {
+  case Dead | Empty | PreparingRebalance =>
+forceComplete()
+  case CompletingRebalance | Stable =>
+if (group.hasReceivedSyncFromAllMembers())
+  forceComplete()
+else false
+}
+  }
+}
+  }
+
+  def onExpirePendingSync(
+group: GroupMetadata,
+generationId: Int
+  ): Unit = {
+group.inLock {
+  if (generationId != group.generationId) {
+debug(s"Received unexpected notification of sync expiration for 
${group.groupId} " +
+  s" with an old generation $generationId.")

Review comment:
   Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-15 Thread GitBox


dajac commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652188256



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
+}
+  }
+}
+  }
+
+  private def removePendingSyncMember(
+group: GroupMetadata,
+memberId: String
+  ): Unit = {
+group.removePendingSyncMember(memberId)
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(

Review comment:
   Definitely. I will add that.
   
   I cannot really think of any other doable alternatives. At least, we know 
that the leader has sent the SyncGroup as the group transitioned to stable.
   
   Throwing few ideas:
   * Could we store the pending set in the log? That does not seem really 
doable as the set could be updated before the write is acknowledged. Therefore 
we could think that a member is pending after a failover but it might not be.
   * We could transition to Stable only when all pending members have joined. 
We already discussed this and agreed that this is not ideal as it would delay 
processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652187497



##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -580,7 +582,7 @@ object ConsumerGroupCommand extends Logging {
 groupId,
 Option(consumerGroup.coordinator),
 unassignedPartitions.keySet.toSeq,
-unassignedPartitions.map { case (tp, offset) => tp -> 
Some(offset.offset) },
+unassignedPartitions.map { case (tp, offset) => tp -> 
getPartitionOffset(tp) },

Review comment:
   Same with the previous approach, it could be from:
   **From**:
   ```
   val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
 collectConsumerAssignment(
   groupId,
   Option(consumerGroup.coordinator),
   unassignedPartitions.keySet.toSeq,
   unassignedPartitions.map { case (tp, offset) => tp -> 
getPartitionOffset(tp) },
   Some(MISSING_COLUMN_VALUE),
   Some(MISSING_COLUMN_VALUE),
   Some(MISSING_COLUMN_VALUE)).toSeq
   } else
 Seq.empty
   ```
   **To**:
   ```
   val rowsWithoutConsumer = if (unassignedPartitions.nonEmpty) {
 collectConsumerAssignment(
   groupId,
   Option(consumerGroup.coordinator),
   unassignedPartitions.keySet.toSeq,
   getPartitionOffsets(unassignedPartitions.keySet.toSeq),
   Some(MISSING_COLUMN_VALUE),
   Some(MISSING_COLUMN_VALUE),
   Some(MISSING_COLUMN_VALUE)).toSeq
   } else
 Seq.empty
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ncliang commented on pull request #10475: KAFKA-12610: Implement PluginClassLoader::getResource

2021-06-15 Thread GitBox


ncliang commented on pull request #10475:
URL: https://github.com/apache/kafka/pull/10475#issuecomment-861856520


   Thanks for review @gharris1727 . @kkonstantine , @rhauch , could one of you 
take a look when you get a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10863: KAFKA-12890; Consumer group stuck in `CompletingRebalance`

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10863:
URL: https://github.com/apache/kafka/pull/10863#discussion_r652141670



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
+}
+  }
+}
+  }
+
+  private def removePendingSyncMember(
+group: GroupMetadata,
+memberId: String
+  ): Unit = {
+group.removePendingSyncMember(memberId)
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(

Review comment:
   One thing we seem to be missing is a call to `removeSyncExpiration` when 
the group is unloaded in `onGroupUnloaded`. That brings up an interesting 
limitation. When a group is loaded, there is no way to tell which followers 
have or have not sent the SyncGroup. The only reasonable thing to do is 
probably assume that they all did indeed send it since we could not rely on the 
request being resent if it had already been successfully received.

##
File path: core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
##
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when is 
completing the rebalance.
+ *
+ * Whenever a SyncGroup is received, checks that we received all the SyncGroup 
request from
+ * each member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a 
SyncGroup requests
+ * are removed from the group. If any members is removed, the group is 
rebalanced.
+ */
+private[group] class DelayedSync(

Review comment:
   There are a set of metrics that get exposed automatically for each 
purgatory we add. I'm considering if these will be useful or if we should add a 
flag to let us disable them.

##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1450,7 +1457,95 @@ class GroupCoordinator(val brokerId: Int,
 group.maybeInvokeJoinCallback(member, joinResult)
 completeAndScheduleNextHeartbeatExpiration(group, member)
 member.isNew = false
+
+group.addPendingSyncMember(member.memberId)
   }
+
+  schedulePendingSync(group)
+}
+  }
+}
+  }
+
+  private def removePendingSyncMember(
+group: GroupMetadata,
+memberId: String
+  ): Unit = {
+group.removePendingSyncMember(memberId)
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def removeSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+group.clearPendingSyncMembers()
+maybeCompleteSyncExpiration(group)
+  }
+
+  private def maybeCompleteSyncExpiration(
+group: GroupMetadata
+  ): Unit = {
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.checkAndComplete(groupKey)
+  }
+
+  private def schedulePendingSync(
+group: GroupMetadata
+  ): Unit = {
+val delayedSync = new DelayedSync(this, group, group.generationId, 
group.rebalanceTimeoutMs)
+val groupKey = GroupKey(group.groupId)
+syncPurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+  }
+
+  def tryCompletePendingSync(
+group: GroupMetadata,
+generationId: Int,
+forceComplete: () => Boolean
+  ): Boolean = {
+group.inLock {
+  if (generationId != group.generationId) {
+forceComplete()
+  } else {
+group.currentState match {
+  case Dead | Empty | PreparingRebalance =>
+forceComplete()
+  case CompletingRebalance | Stable =>
+if (group.hasReceivedSyncFromAllMembers())
+  forceComplete()
+else false
+}
+  }
+}
+  }
+
+  def onExpirePendingSync(
+group: GroupMetadata,
+generationId: Int
+  ): Unit = {
+group.inLock {
+   

[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-15 Thread GitBox


dhruvilshah3 commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r652157061



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -368,12 +368,50 @@ object LogLoader extends Logging {
 for (swapFile <- swapFiles) {
   val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
Log.SwapFileSuffix, ""))
   val baseOffset = Log.offsetFromFile(logFile)
+  // Check whether swap index files exist: if not, the cleaned files must 
exist due to the
+  // existence of swap log file. Therefore, we rename the cleaned files to 
swap files and continue.
+  var recoverable = true
+  val swapOffsetIndexFile = Log.offsetIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)

Review comment:
   You could perhaps define a method like this:
   
   ```
 def maybeCompleteInterruptedSwap(fn: (File, Long, String) => File): 
Boolean = {
   val swapIndexFile = fn(swapFile.getParentFile, baseOffset, 
Log.SwapFileSuffix)
   if (!swapIndexFile.exists()) {
 val cleanedIndexFile = fn(swapFile.getParentFile, baseOffset, 
Log.CleanedFileSuffix)
 if (cleanedIndexFile.exists()) {
   cleanedIndexFile.renameTo(swapIndexFile)
   true
 } else {
   false
 }
   }
 }
   ```
   
   and then invoke it as
   
   ```
 var recoverable = true
 recoverable = maybeCompleteInterruptedSwap(Log.offsetIndexFile)
 if (recoverable)
   recoverable = maybeCompleteInterruptedSwap(Log.timeIndexFile)
 if (recoverable)
   recoverable = maybeCompleteInterruptedSwap(Log.transactionIndexFile)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652153471



##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
   val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
 val state = consumerGroup.state
 val committedOffsets = getCommittedOffsets(groupId)
+// The admin client returns `null` as a value to indicate that there 
is not committed offset for a partition. The following getPartitionOffset 
function seeks to avoid NullPointerException by filtering out those null values.
+def getPartitionOffset(tp: TopicPartition): Option[Long] = 
committedOffsets.get(tp).filter(_ != null).map(_.offset)
 var assignedTopicPartitions = ListBuffer[TopicPartition]()
 val rowsWithConsumer = 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
   .sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
   val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
   assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
   val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
 .map { topicPartition =>
-  topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
+  topicPartition -> getPartitionOffset(topicPartition)

Review comment:
   Not directly. As `collectConsumerAssignment` expects for 
getPartitionOffset parameter to be a `TopicPartition => Option[Long]`, and as 
defined getPartitionOffset returns a Option[Long] for the specified 
TopicPartition.
   It can be modified to accept a sequence of TopicPartitions though and return 
a map, something like this:
   
   ```
   def getPartitionOffsets(tp: Seq[TopicPartition]): TopicPartition => 
Option[Long] = tp.map { topicPartition => topicPartition -> 
committedOffsets.get(topicPartition).filter(_ != null).map(_.offset)}.toMap
   ```
   
   That way can be pass directly to collectConsumerAssignment, by changing:
   
   ```
   val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
   .map { topicPartition =>
 topicPartition -> getPartitionOffset(topicPartition)
   }.toMap 
   
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), 
topicPartitions.toList,
   partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
   Some(s"${consumerSummary.clientId}"))
   ```
   
   To:
   
   ```
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), 
topicPartitions.toList,
   
getPartitionOffsets(consumerSummary.assignment.topicPartitions.asScala.toSeq), 
Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
   Some(s"${consumerSummary.clientId}"))
   ```
   
   @dajac Do you think is a good approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12953) Bump Zookeeper version to 3.6.3

2021-06-15 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-12953:
-

 Summary: Bump Zookeeper version to 3.6.3
 Key: KAFKA-12953
 URL: https://issues.apache.org/jira/browse/KAFKA-12953
 Project: Kafka
  Issue Type: Task
Reporter: Ryan Dielhenn
Assignee: Ryan Dielhenn


Bump the Zookeeper version used by Kafka to the latest stable release (3.6.3).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652153471



##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -560,14 +560,16 @@ object ConsumerGroupCommand extends Logging {
   val groupOffsets = TreeMap[String, (Option[String], 
Option[Seq[PartitionAssignmentState]])]() ++ (for ((groupId, consumerGroup) <- 
consumerGroups) yield {
 val state = consumerGroup.state
 val committedOffsets = getCommittedOffsets(groupId)
+// The admin client returns `null` as a value to indicate that there 
is not committed offset for a partition. The following getPartitionOffset 
function seeks to avoid NullPointerException by filtering out those null values.
+def getPartitionOffset(tp: TopicPartition): Option[Long] = 
committedOffsets.get(tp).filter(_ != null).map(_.offset)
 var assignedTopicPartitions = ListBuffer[TopicPartition]()
 val rowsWithConsumer = 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
   .sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
   val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
   assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
   val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
 .map { topicPartition =>
-  topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
+  topicPartition -> getPartitionOffset(topicPartition)

Review comment:
   Not directly. As `collectConsumerAssignment` expects for 
getPartitionOffset parameter to be a `TopicPartition => Option[Long]`, and as 
defined getPartitionOffset returns a Option[Long] for the specified 
TopicPartition.
   It can be modified to accept a sequence of TopicPartitions though, to 
something like this:
   
   ```
   def getPartitionOffsets(tp: Seq[TopicPartition]): TopicPartition => 
Option[Long] = tp.map { topicPartition => topicPartition -> 
committedOffsets.get(topicPartition).filter(_ != null).map(_.offset)}.toMap
   ```
   
   That way can be pass directly to collectConsumerAssignment, by changing:
   
   ```
   val partitionOffsets = consumerSummary.assignment.topicPartitions.asScala
   .map { topicPartition =>
 topicPartition -> getPartitionOffset(topicPartition)
   }.toMap 
   
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), 
topicPartitions.toList,
   partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
   Some(s"${consumerSummary.clientId}"))
   ```
   
   To:
   
   ```
   collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), 
topicPartitions.toList,
   
getPartitionOffsets(consumerSummary.assignment.topicPartitions.asScala.toSeq), 
Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),
   Some(s"${consumerSummary.clientId}"))
   ```
   
   @dajac Do you think is a good approach?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10885: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-15 Thread GitBox


ijuma merged pull request #10885:
URL: https://github.com/apache/kafka/pull/10885


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10885: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-15 Thread GitBox


ijuma commented on pull request #10885:
URL: https://github.com/apache/kafka/pull/10885#issuecomment-861832652


   Unrelated failures:
   
   > Build / JDK 15 and Scala 2.13 / 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()
   > Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testOneWayReplicationWithAutoOffsetSync()
   > Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining
   > Build / JDK 11 and Scala 2.13 / 
kafka.api.TransactionsBounceTest.testWithGroupId()
   > Build / JDK 11 and Scala 2.13 / 
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-15 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r652151007



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -368,12 +368,50 @@ object LogLoader extends Logging {
 for (swapFile <- swapFiles) {
   val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
Log.SwapFileSuffix, ""))
   val baseOffset = Log.offsetFromFile(logFile)
+  // Check whether swap index files exist: if not, the cleaned files must 
exist due to the
+  // existence of swap log file. Therefore, we rename the cleaned files to 
swap files and continue.
+  var recoverable = true
+  val swapOffsetIndexFile = Log.offsetIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapOffsetIndexFile.exists()) {
+val cleanedOffsetIndexFile = 
Log.offsetIndexFile(swapFile.getParentFile, baseOffset, Log.CleanedFileSuffix)
+if (cleanedOffsetIndexFile.exists())
+  cleanedOffsetIndexFile.renameTo(swapOffsetIndexFile)
+else
+  recoverable = false
+  }
+  val swapTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTimeIndexFile.exists()) {
+val cleanedTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.CleanedFileSuffix)
+if (cleanedTimeIndexFile.exists())
+  cleanedTimeIndexFile.renameTo(swapTimeIndexFile)
+else
+  recoverable = false
+  }
+  val swapTxnIndexFile = Log.transactionIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTxnIndexFile.exists()) {
+val cleanedTxnIndexFile = 
Log.transactionIndexFile(swapFile.getParentFile, baseOffset, 
Log.CleanedFileSuffix)
+if (cleanedTxnIndexFile.exists())
+  cleanedTxnIndexFile.renameTo(swapTxnIndexFile)
+else
+  recoverable = false
+  }
   val swapSegment = LogSegment.open(swapFile.getParentFile,
 baseOffset = baseOffset,
 params.config,
 time = params.time,
 fileSuffix = Log.SwapFileSuffix)
-  info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from 
interrupted swap operation, repairing.")
+  if (recoverable) {

Review comment:
   The whole logic is that, if the segment.swap file exists, then all index 
files should exist as .cleaned or .swap. We find them and rename them to .swap 
[before this block of code]. Then do a sanity check and rename all the .swap 
files to non-suffix log files [within this block of code].
   
   This could fix the issue caused by the compaction as we discussed before.
   
   For all other cases, I think it is in an inconsistent state and we will have 
to do the original recovery.
   
   Does this make sense to you?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-15 Thread GitBox


ccding commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r652151007



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -368,12 +368,50 @@ object LogLoader extends Logging {
 for (swapFile <- swapFiles) {
   val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
Log.SwapFileSuffix, ""))
   val baseOffset = Log.offsetFromFile(logFile)
+  // Check whether swap index files exist: if not, the cleaned files must 
exist due to the
+  // existence of swap log file. Therefore, we rename the cleaned files to 
swap files and continue.
+  var recoverable = true
+  val swapOffsetIndexFile = Log.offsetIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapOffsetIndexFile.exists()) {
+val cleanedOffsetIndexFile = 
Log.offsetIndexFile(swapFile.getParentFile, baseOffset, Log.CleanedFileSuffix)
+if (cleanedOffsetIndexFile.exists())
+  cleanedOffsetIndexFile.renameTo(swapOffsetIndexFile)
+else
+  recoverable = false
+  }
+  val swapTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTimeIndexFile.exists()) {
+val cleanedTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.CleanedFileSuffix)
+if (cleanedTimeIndexFile.exists())
+  cleanedTimeIndexFile.renameTo(swapTimeIndexFile)
+else
+  recoverable = false
+  }
+  val swapTxnIndexFile = Log.transactionIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTxnIndexFile.exists()) {
+val cleanedTxnIndexFile = 
Log.transactionIndexFile(swapFile.getParentFile, baseOffset, 
Log.CleanedFileSuffix)
+if (cleanedTxnIndexFile.exists())
+  cleanedTxnIndexFile.renameTo(swapTxnIndexFile)
+else
+  recoverable = false
+  }
   val swapSegment = LogSegment.open(swapFile.getParentFile,
 baseOffset = baseOffset,
 params.config,
 time = params.time,
 fileSuffix = Log.SwapFileSuffix)
-  info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from 
interrupted swap operation, repairing.")
+  if (recoverable) {

Review comment:
   The whole logic is that, if the segment.swap file exists, then all index 
files should exist as .cleaned or .swap. We find them and rename them to 
.swaped [before this block of code]. Then do a sanity check and swap all the 
.swap files to non-suffix log files [within this block of code].
   
   This could fix the issue caused by the compaction as we discussed before.
   
   For all other cases, I think it is in an inconsistent state and we will have 
to do the original recovery.
   
   Does this make sense to you?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #10848: MINOR Updated transaction index as optional in LogSegmentData.

2021-06-15 Thread GitBox


junrao merged pull request #10848:
URL: https://github.com/apache/kafka/pull/10848


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #10763: KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-06-15 Thread GitBox


dhruvilshah3 commented on a change in pull request #10763:
URL: https://github.com/apache/kafka/pull/10763#discussion_r652142906



##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -368,12 +368,50 @@ object LogLoader extends Logging {
 for (swapFile <- swapFiles) {
   val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
Log.SwapFileSuffix, ""))
   val baseOffset = Log.offsetFromFile(logFile)
+  // Check whether swap index files exist: if not, the cleaned files must 
exist due to the
+  // existence of swap log file. Therefore, we rename the cleaned files to 
swap files and continue.
+  var recoverable = true
+  val swapOffsetIndexFile = Log.offsetIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapOffsetIndexFile.exists()) {
+val cleanedOffsetIndexFile = 
Log.offsetIndexFile(swapFile.getParentFile, baseOffset, Log.CleanedFileSuffix)
+if (cleanedOffsetIndexFile.exists())
+  cleanedOffsetIndexFile.renameTo(swapOffsetIndexFile)
+else
+  recoverable = false
+  }
+  val swapTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTimeIndexFile.exists()) {
+val cleanedTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.CleanedFileSuffix)
+if (cleanedTimeIndexFile.exists())
+  cleanedTimeIndexFile.renameTo(swapTimeIndexFile)
+else
+  recoverable = false
+  }
+  val swapTxnIndexFile = Log.transactionIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTxnIndexFile.exists()) {
+val cleanedTxnIndexFile = 
Log.transactionIndexFile(swapFile.getParentFile, baseOffset, 
Log.CleanedFileSuffix)
+if (cleanedTxnIndexFile.exists())
+  cleanedTxnIndexFile.renameTo(swapTxnIndexFile)
+else
+  recoverable = false
+  }
   val swapSegment = LogSegment.open(swapFile.getParentFile,
 baseOffset = baseOffset,
 params.config,
 time = params.time,
 fileSuffix = Log.SwapFileSuffix)
-  info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from 
interrupted swap operation, repairing.")
+  if (recoverable) {
+try {
+  swapSegment.sanityCheck(true)
+  info(s"Found log file ${swapFile.getPath} from interrupted swap 
operation, which is recoverable from ${Log.CleanedFileSuffix} files.")
+  swapSegment.changeFileSuffixes(Log.SwapFileSuffix, "")
+  return
+} catch {
+  case _: NoSuchFileException => {}
+  // do nothing and fall back to the recover index logic
+}
+  }
+  info(s"${params.logIdentifier}Found log file ${swapFile.getPath} from 
interrupted swap operation, which is not recoverable from 
${Log.CleanedFileSuffix} files, repairing.")
   recoverSegment(swapSegment, params)

Review comment:
   The main thing we want to avoid is running this recovery logic for 
scenarios where the `rename` operation was interrupted, as it rebuilds the 
producer state from scratch. Could we make this recovery conditional on whether 
we have all the relevant log files and indices?

##
File path: core/src/main/scala/kafka/log/LogLoader.scala
##
@@ -368,12 +368,50 @@ object LogLoader extends Logging {
 for (swapFile <- swapFiles) {
   val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
Log.SwapFileSuffix, ""))
   val baseOffset = Log.offsetFromFile(logFile)
+  // Check whether swap index files exist: if not, the cleaned files must 
exist due to the
+  // existence of swap log file. Therefore, we rename the cleaned files to 
swap files and continue.
+  var recoverable = true
+  val swapOffsetIndexFile = Log.offsetIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapOffsetIndexFile.exists()) {
+val cleanedOffsetIndexFile = 
Log.offsetIndexFile(swapFile.getParentFile, baseOffset, Log.CleanedFileSuffix)
+if (cleanedOffsetIndexFile.exists())
+  cleanedOffsetIndexFile.renameTo(swapOffsetIndexFile)
+else
+  recoverable = false
+  }
+  val swapTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTimeIndexFile.exists()) {
+val cleanedTimeIndexFile = Log.timeIndexFile(swapFile.getParentFile, 
baseOffset, Log.CleanedFileSuffix)
+if (cleanedTimeIndexFile.exists())
+  cleanedTimeIndexFile.renameTo(swapTimeIndexFile)
+else
+  recoverable = false
+  }
+  val swapTxnIndexFile = Log.transactionIndexFile(swapFile.getParentFile, 
baseOffset, Log.SwapFileSuffix)
+  if (!swapTxnIndexFile.exists()) {
+val cleanedTxnIndexFile = 
Log.transactionIndexFile(swapFile.getParentFile, baseOffset, 
Log.CleanedFileSuffix)
+if (cleanedTxnIndexFile.exists())

[GitHub] [kafka] junrao commented on a change in pull request #10733: KAFKA-12816 Added tiered storage related configs including remote log manager configs.

2021-06-15 Thread GitBox


junrao commented on a change in pull request #10733:
URL: https://github.com/apache/kafka/pull/10733#discussion_r652137135



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
##
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
+import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
+import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
+import static org.apache.kafka.common.config.ConfigDef.Type.DOUBLE;
+import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
+import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
+
+public final class RemoteLogManagerConfig {
+
+/**
+ * Prefix used for properties to be passed to {@link RemoteStorageManager} 
implementation. Remote log subsystem collects all the properties having
+ * this prefix and passes to {@code RemoteStorageManager} using {@link 
RemoteStorageManager#configure(Map)}.
+ */
+public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
+public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
+"implementation. For example this value can be `rsm.s3.`.";
+
+/**
+ * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
+ * this prefix and passed to {@code RemoteLogMetadataManager} using {@link 
RemoteLogMetadataManager#configure(Map)}.
+ */
+public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
+public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
+"implementation. For example this value can be `rlmm.s3.`.";
+
+public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
+public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
+"are `true` or `false` and the default value is false. When it is 
true broker starts all the services required for tiered storage functionality.";
+public static final boolean DEFAULT_REMOTE_LOG_STORAGE_SYSTEM_ENABLE = 
false;
+
+public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP = 
"remote.log.storage.manager.class.name";
+public static final String REMOTE_STORAGE_MANAGER_CLASS_NAME_DOC = "Fully 
qualified class name of `RemoteLogStorageManager` implementation.";
+
+public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_PROP = 
"remote.log.storage.manager.class.path";
+public static final String REMOTE_STORAGE_MANAGER_CLASS_PATH_DOC = "Class 
path of the `RemoteLogStorageManager` implementation." +
+"If specified, the RemoteLogStorageManager implementation and its 
dependent libraries will be loaded by a dedicated" +
+"classloader which searches this class path before the Kafka 
broker class path. The syntax of this parameter is same" +
+"with the standard Java class path string.";
+
+public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP = 
"remote.log.metadata.manager.class.name";
+public static final String REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_DOC = 
"Fully qualified class name of `RemoteLogMetadataManager` implementation.";
+//todo add the default topic based RLMM class 

[GitHub] [kafka] rondagostino commented on pull request #10886: MINOR: TestSecurityRollingUpgrade system test fixes

2021-06-15 Thread GitBox


rondagostino commented on pull request #10886:
URL: https://github.com/apache/kafka/pull/10886#issuecomment-861793780


   
https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-06-15--001.1623785695--rondagostino--minor_more_fix_security_rolling_upgrade--f1a86adb9/report.html
   
   ```
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-06-15--001
   run time: 27 minutes 27.648 seconds
   tests run:16
   passed:   16
   failed:   0
   ignored:  0
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


izzyacademy commented on pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#issuecomment-861778238


   @mjsax the changes look good to me. I will rebase my branch after your 
changes get merged into trunk. Thanks for looping me in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


izzyacademy commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652088281



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -150,12 +150,11 @@ public long get() {
 // Time shared between joins to keep track of the maximum stream time
 final MaxObservedStreamTime maxObservedStreamTime = new 
MaxObservedStreamTime();
 
+final JoinWindowsInternal internalWindows = new 
JoinWindowsInternal(windows);
 final KStreamKStreamJoin joinThis = new 
KStreamKStreamJoin<>(
 true,
 otherWindowStore.name(),
-windows.beforeMs,
-windows.afterMs,
-windows.gracePeriodMs(),
+internalWindows,

Review comment:
   I like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


izzyacademy commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652087933



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -114,7 +133,7 @@ public static JoinWindows of(final Duration timeDifference) 
throws IllegalArgume
 public JoinWindows before(final Duration timeDifference) throws 
IllegalArgumentException {
 final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
 final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefix);
-return new JoinWindows(timeDifferenceMs, afterMs, 
DEFAULT_GRACE_PERIOD_MS);
+return new JoinWindows(timeDifferenceMs, afterMs, graceMs, 
enableSpuriousResultFix);

Review comment:
   As per the feedback from @ableegoldman the grace period in the old 
methods need to stay at 24h. It is in the new methods that we get to specify it 
as zero.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


izzyacademy commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r652087049



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
##
@@ -76,18 +76,37 @@
 
 private final long graceMs;
 
+protected final boolean enableSpuriousResultFix;

Review comment:
   Thanks for adding this note. This summary is very helpful to me in my 
implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652076206



##
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public abstract class TransactionsCommand {
+private static final Logger log = 
LoggerFactory.getLogger(TransactionsCommand.class);
+
+protected final Time time;
+
+protected TransactionsCommand(Time time) {
+this.time = time;
+}
+
+/**
+ * Get the name of this command (e.g. `describe-producers`).
+ */
+abstract String name();
+
+/**
+ * Specify the arguments needed for this command.
+ */
+abstract void addSubparser(Subparsers subparsers);
+
+/**
+ * Execute the command logic.
+ */
+abstract void execute(Admin admin, Namespace ns, PrintStream out) throws 
Exception;
+
+
+static class AbortTransactionCommand extends TransactionsCommand {
+
+AbortTransactionCommand(Time time) {
+super(time);
+}
+
+@Override
+String name() {
+return "abort";
+}
+
+@Override
+void addSubparser(Subparsers subparsers) {
+Subparser subparser = subparsers.addParser(name())
+.help("abort a hanging transaction (requires administrative 
privileges)");
+
+subparser.addArgument("--topic")
+.help("topic name")
+.action(store())
+.type(String.class)
+.required(true);
+
+subparser.addArgument("--partition")
+.help("partition number")
+.action(store())
+.type(Integer.class)
+.required(true);
+
+ArgumentGroup newBrokerArgumentGroup = 
subparser.addArgumentGroup("new brokers")
+.description("For newer brokers, you must provide the start 
offset of the transaction " +
+"to be aborted");
+
+newBrokerArgumentGroup.addArgument("--start-offset")
+.help("start offset of the transaction to abort")
+.action(store())
+.type(Long.class);
+
+ArgumentGroup olderBrokerArgumentGroup = 
subparser.addArgumentGroup("older brokers")
+.description("For older brokers, you must provide all of these 
arguments");
+
+olderBrokerArgumentGroup.addArgument("--producer-id")
+

[GitHub] [kafka] hachikuji commented on a change in pull request #10814: KAFKA-12888; Add transaction tool from KIP-664

2021-06-15 Thread GitBox


hachikuji commented on a change in pull request #10814:
URL: https://github.com/apache/kafka/pull/10814#discussion_r652069011



##
File path: gradle/dependencies.gradle
##
@@ -57,7 +57,7 @@ versions += [
   activation: "1.1.1",
   apacheda: "1.0.2",
   apacheds: "2.0.0-M24",
-  argparse4j: "0.7.0",
+  argparse4j: "0.8.1",

Review comment:
   To be honest, I forgot why I bumped this. We can do it in a separate PR 
if you prefer. Dependency updates do not typically call for any user 
notification unless there was some kind of compatibility impact.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12952) Metadata Snapshot File Delimiters

2021-06-15 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel updated KAFKA-12952:
---
Description: 
Create new Control Records that will serve as the header and footer for a 
Metadata Snapshot File. These records will be contained at the beginning and 
end of each Snapshot File, and can be checked to verify completeness of a 
snapshot file.

The following fields are proposed for the Header:
 # *Version :* Schema version for the snapshot header
 # *End Offset* : End offset of the snapshot from the snapshot ID
 # *Epoch :* Epoch of the snapshot ** from the Snapshot ID**
 # *Creator ID* : (Optional) ID of the broker/Controller that created the 
snapshot
 # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
 # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
each record batch has a timestamp already.

The following fields are proposed for the footer:
 # *Version* : Schema version of the snapshot footer (same as header)
 # *Record Type* : A type fields indicating this is the end record for the 
snapshot file.

 

  was:
Create new Control Records that will serve as the header and footer for a 
Metadata Snapshot File. These records will be contained at the beginning and 
end of each Snapshot File, and can be checked to verify completeness of a 
snapshot file.

The following fields are proposed for the Header:
 # *Version :* Schema version for the snapshot header
 # *End Offset* : End offset of the snapshot from the snapshot ID
 # *Epoch :* Epoch of the snapshot ** from the Snapshot ID**
 # *Creator ID* : (Optional) ID of the broker/Controller that created the 
snapshot
 # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
 # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
each record batch has a timestamp already.

The following fields are proposed for the footer:
 # *Version* : Schema version of the snapshot footer (same as header)
 # *Record Type* : A type fields indicating this this the end record for the 
snapshot file.

 


> Metadata Snapshot File Delimiters
> -
>
> Key: KAFKA-12952
> URL: https://issues.apache.org/jira/browse/KAFKA-12952
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, kraft
>Reporter: Niket Goel
>Priority: Minor
>
> Create new Control Records that will serve as the header and footer for a 
> Metadata Snapshot File. These records will be contained at the beginning and 
> end of each Snapshot File, and can be checked to verify completeness of a 
> snapshot file.
> The following fields are proposed for the Header:
>  # *Version :* Schema version for the snapshot header
>  # *End Offset* : End offset of the snapshot from the snapshot ID
>  # *Epoch :* Epoch of the snapshot ** from the Snapshot ID**
>  # *Creator ID* : (Optional) ID of the broker/Controller that created the 
> snapshot
>  # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
>  # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
> each record batch has a timestamp already.
> The following fields are proposed for the footer:
>  # *Version* : Schema version of the snapshot footer (same as header)
>  # *Record Type* : A type fields indicating this is the end record for the 
> snapshot file.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12952) Metadata Snapshot File Delimiters

2021-06-15 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel updated KAFKA-12952:
---
Description: 
Create new Control Records that will serve as the header and footer for a 
Metadata Snapshot File. These records will be contained at the beginning and 
end of each Snapshot File, and can be checked to verify completeness of a 
snapshot file.

The following fields are proposed for the Header:
 # *Version :* Schema version for the snapshot header
 # *End Offset* : End offset of the snapshot from the snapshot ID
 # *Epoch :* Epoch of the snapshot ** from the Snapshot ID**
 # *Creator ID* : (Optional) ID of the broker/Controller that created the 
snapshot
 # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
 # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
each record batch has a timestamp already.

The following fields are proposed for the footer:
 # *Version* : Schema version of the snapshot footer (same as header)
 # *Record Type* : A type fields indicating this this the end record for the 
snapshot file.

 

  was:
Create a new Control Record that will serve as the header for a Metadata 
Snapshot File. This header will be contained at the beginning of each Snapshot 
File.

Add the following fields to the header record:
 # *Version :* Schema version for the snapshot header
 # *End Offset* : End offset of the snapshot from the snapshot ID
 # *Epoch :* Epoch of the snapshot ** from the Snapshot ID
 # *Snapshot Size* : Size in bytes of the Snapshot File.
 # *Creator ID* : (Optional) ID of the broker/Controller that created the 
snapshot
 # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
 # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
each record batch has a timestamp already.
 # *HasFooter :* Boolean indicating if a control record/batch will be included 
at the end of the file

 


> Metadata Snapshot File Delimiters
> -
>
> Key: KAFKA-12952
> URL: https://issues.apache.org/jira/browse/KAFKA-12952
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, kraft
>Reporter: Niket Goel
>Priority: Minor
>
> Create new Control Records that will serve as the header and footer for a 
> Metadata Snapshot File. These records will be contained at the beginning and 
> end of each Snapshot File, and can be checked to verify completeness of a 
> snapshot file.
> The following fields are proposed for the Header:
>  # *Version :* Schema version for the snapshot header
>  # *End Offset* : End offset of the snapshot from the snapshot ID
>  # *Epoch :* Epoch of the snapshot ** from the Snapshot ID**
>  # *Creator ID* : (Optional) ID of the broker/Controller that created the 
> snapshot
>  # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
>  # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
> each record batch has a timestamp already.
> The following fields are proposed for the footer:
>  # *Version* : Schema version of the snapshot footer (same as header)
>  # *Record Type* : A type fields indicating this this the end record for the 
> snapshot file.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12952) Metadata Snapshot File Delimiters

2021-06-15 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel updated KAFKA-12952:
---
Summary: Metadata Snapshot File Delimiters  (was: Metadata Snapshot File 
Header)

> Metadata Snapshot File Delimiters
> -
>
> Key: KAFKA-12952
> URL: https://issues.apache.org/jira/browse/KAFKA-12952
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, kraft
>Reporter: Niket Goel
>Priority: Minor
>
> Create a new Control Record that will serve as the header for a Metadata 
> Snapshot File. This header will be contained at the beginning of each 
> Snapshot File.
> Add the following fields to the header record:
>  # *Version :* Schema version for the snapshot header
>  # *End Offset* : End offset of the snapshot from the snapshot ID
>  # *Epoch :* Epoch of the snapshot ** from the Snapshot ID
>  # *Snapshot Size* : Size in bytes of the Snapshot File.
>  # *Creator ID* : (Optional) ID of the broker/Controller that created the 
> snapshot
>  # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
>  # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
> each record batch has a timestamp already.
>  # *HasFooter :* Boolean indicating if a control record/batch will be 
> included at the end of the file
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on a change in pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-06-15 Thread GitBox


lbradstreet commented on a change in pull request #10754:
URL: https://github.com/apache/kafka/pull/10754#discussion_r652042564



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -337,6 +337,9 @@ class Log(@volatile private var _dir: File,
 }
 } else if (keepPartitionMetadataFile) {
   _topicId.foreach(partitionMetadataFile.write)
+} else {

Review comment:
   I think extracting it into a method named initializeTopicId or something 
like that would be good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12950) Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest

2021-06-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12950:

Component/s: unit tests
 streams

> Replace EasyMock and PowerMock with Mockito for KafkaStreamsTest
> 
>
> Key: KAFKA-12950
> URL: https://issues.apache.org/jira/browse/KAFKA-12950
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, unit tests
>Reporter: Josep Prat
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning

2021-06-15 Thread GitBox


cmccabe commented on a change in pull request #10864:
URL: https://github.com/apache/kafka/pull/10864#discussion_r652028042



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -253,7 +253,8 @@ class KafkaRaftManager[T](
   time,
   scheduler,
   maxBatchSizeInBytes = KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-  maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+  maxFetchSizeInBytes = KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+  config = MetadataLogConfig(config)

Review comment:
   should maxBatchSizeInBytes be part of MetadataLogConfig?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-06-15 Thread GitBox


jolshan commented on a change in pull request #10754:
URL: https://github.com/apache/kafka/pull/10754#discussion_r652022729



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -337,6 +337,9 @@ class Log(@volatile private var _dir: File,
 }
 } else if (keepPartitionMetadataFile) {
   _topicId.foreach(partitionMetadataFile.write)
+} else {

Review comment:
   Yeah. Unfortunately there is a lot going on here. I've edited the 
comment and spacing to make it a little clearer. If we still think it needs 
adjustment, I can try making a new method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10754: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update)

2021-06-15 Thread GitBox


jolshan commented on pull request #10754:
URL: https://github.com/apache/kafka/pull/10754#issuecomment-861710587


   I thought I cherry-picked this to 2.8, but I guess I didn't. I will need to 
do that as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12946) __consumer_offsets topic with very big partitions

2021-06-15 Thread Ron Dagostino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17363805#comment-17363805
 ] 

Ron Dagostino commented on KAFKA-12946:
---

The only one I am familiar with and would recommend is the upgrade.

> __consumer_offsets topic with very big partitions
> -
>
> Key: KAFKA-12946
> URL: https://issues.apache.org/jira/browse/KAFKA-12946
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.0.0
>Reporter: Emi
>Priority: Critical
>
> I am using Kafka 2.0.0 with java 8u191
>  There is a partitions of the __consumer_offsets topic that is 600 GB with 
> 6000 segments older than 4 months. Other partitions of that topic are small: 
> 20-30 MB.
> There are 60 consumer groups, 90 topics and 100 partitions per topic.
> There aren't errors in the logs. From the log of the logcleaner, I can see 
> that partition is never touched from the logcleaner thread for the 
> compaction, but it only add new segments.
>  How is this possible?
> There was another partition with the same problem, but after some months it 
> has been compacted. Now there is only one partition with this problem, but 
> this is bigger and keep growing
> I have used the kafka-dump-log tool to check these old segments and I can see 
> many duplicates. So I would assume that is not compacted.
> My settings:
>  {{offsets.commit.required.acks = -1}}
>  {{[offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/]}} = 5000
>  {{offsets.load.buffer.size = 5242880}}
>  
> {{[offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/]}}
>  = 60
>  {{offsets.retention.minutes = 10080}}
>  {{offsets.topic.compression.codec = 0}}
>  {{offsets.topic.num.partitions = 50}}
>  {{offsets.topic.replication.factor = 3}}
>  {{offsets.topic.segment.bytes = 104857600}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF edited a comment on pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#issuecomment-861696913


   @ijuma @jsancio  Thanks for your comments and review. Update the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#issuecomment-861696913


   Thanks for your comments and review. Update the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12787) Integrate controller snapshot with the RaftClient

2021-06-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12787.
-
Resolution: Fixed

> Integrate controller snapshot with the RaftClient
> -
>
> Key: KAFKA-12787
> URL: https://issues.apache.org/jira/browse/KAFKA-12787
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client

2021-06-15 Thread GitBox


hachikuji merged pull request #10786:
URL: https://github.com/apache/kafka/pull/10786


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10867: KAFKA-12931: KIP-746: Revise KRaft Metadata Records

2021-06-15 Thread GitBox


cmccabe commented on a change in pull request #10867:
URL: https://github.com/apache/kafka/pull/10867#discussion_r652002022



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -236,20 +239,11 @@ public void replay(RegisterBrokerRecord record) {
 features.put(feature.name(), new VersionRange(
 feature.minSupportedVersion(), feature.maxSupportedVersion()));
 }
-// Normally, all newly registered brokers start off in the fenced 
state.
-// If this registration record is for a broker incarnation that was 
already
-// registered, though, we preserve the existing fencing state.
-boolean fenced = true;
-BrokerRegistration prevRegistration = 
brokerRegistrations.get(brokerId);
-if (prevRegistration != null &&
-
prevRegistration.incarnationId().equals(record.incarnationId())) {
-fenced = prevRegistration.fenced();
-}
 // Update broker registrations.
 brokerRegistrations.put(brokerId, new BrokerRegistration(brokerId,
 record.brokerEpoch(), record.incarnationId(), listeners, features,
-Optional.ofNullable(record.rack()), fenced));
-
+Optional.ofNullable(record.rack()), record.fenced()));
+BrokerRegistration prevRegistration = 
brokerRegistrations.get(brokerId);

Review comment:
   Only the controller decides when brokers can be unfenced.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12773) Use UncheckedIOException when wrapping IOException

2021-06-15 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12773.
-
Resolution: Fixed

> Use UncheckedIOException when wrapping IOException
> --
>
> Key: KAFKA-12773
> URL: https://issues.apache.org/jira/browse/KAFKA-12773
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>  Labels: kip-500, newbie
>
> The {{raft}} module may not be fully consistent on this but in general in 
> that module we have decided to not throw the checked {{IOException}}. We have 
> been avoiding checked {{IOException}} exceptions by wrapping them in 
> {{RuntimeException}}. The {{raft}} module should instead wrap {{IOException}} 
> in {{UncheckedIOException}}. This change should be limited to the {{raft}} 
> module.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#discussion_r651999235



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -238,6 +238,8 @@ class TestRaftServer(
   reader.close()
 
 case Shutdown => // Ignore shutdown command
+
+case _ => // Ignore other events (such as null)

Review comment:
   Will change on the PR, thanks for the comment!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-06-15 Thread GitBox


hachikuji merged pull request #10749:
URL: https://github.com/apache/kafka/pull/10749


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#discussion_r651993936



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -238,6 +238,8 @@ class TestRaftServer(
   reader.close()
 
 case Shutdown => // Ignore shutdown command
+
+case _ => // Ignore other events (such as null)

Review comment:
   Exactly. When when eventQueue hits the eventTimeoutMs the `null` value 
arises (as there is nothing to poll from). When debugging encounter that 
`eventTimeoutMs ` could be 0 (if the throttleTimeMs is equal to 0), so if there 
is no inmediate events on the eventQueue, the `null` arises inmediatly, and as 
it is not caught, the thread stops.
   
   And yes, at the beggining I approached it with a `case null => // Ignore 
null event`, which made the class run just fine, but decided to let it generic 
to catch some events that may be being overlooked. Thinking of it know seems 
better to just let the case null, because otherwise it would made more 
difficult to debug further situations (non mapped events for example)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#discussion_r651993936



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -238,6 +238,8 @@ class TestRaftServer(
   reader.close()
 
 case Shutdown => // Ignore shutdown command
+
+case _ => // Ignore other events (such as null)

Review comment:
   Exactly. When when eventQueue hits the eventTimeoutMs the `null` value 
arises (as there is nothing to poll from). When debugging encounter that 
`eventTimeoutMs ` could be 0 (if the throttleTimeMs is equal to 0), so if there 
is no inmediate events on the eventQueue, the `null` arises inmediatly, and as 
it is not caught, the thread stops.
   
   And yes, at the beggining I approached it with a `case null => // Ignore 
null event`, which made the class run just fine, but decided to let it generic 
to catch some events that may be being overlooked. 
   What do you think about that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#discussion_r651993936



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -238,6 +238,8 @@ class TestRaftServer(
   reader.close()
 
 case Shutdown => // Ignore shutdown command
+
+case _ => // Ignore other events (such as null)

Review comment:
   Exactly. When when eventQueue hits the eventTimeoutMs the `null` value 
arises (as there is nothing to poll from). When debugging encounter that 
`eventTimeoutMs ` could be 0 (if the throttleTimeMs is equal to 0), so if there 
is no inmediate events on the eventQueue, the `null` arises inmediatly, and as 
it is not caught, the thread stops.
   
   And yes, at the beggining I approached it with a `case null => // Ignore 
null event`, which made the class run just fine, but let it generic to catch 
some events that may be being overlooked. 
   What do you think about the change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10867: KAFKA-12931: KIP-746: Revise KRaft Metadata Records

2021-06-15 Thread GitBox


mumrah commented on a change in pull request #10867:
URL: https://github.com/apache/kafka/pull/10867#discussion_r651991734



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##
@@ -236,20 +239,11 @@ public void replay(RegisterBrokerRecord record) {
 features.put(feature.name(), new VersionRange(
 feature.minSupportedVersion(), feature.maxSupportedVersion()));
 }
-// Normally, all newly registered brokers start off in the fenced 
state.
-// If this registration record is for a broker incarnation that was 
already
-// registered, though, we preserve the existing fencing state.
-boolean fenced = true;
-BrokerRegistration prevRegistration = 
brokerRegistrations.get(brokerId);
-if (prevRegistration != null &&
-
prevRegistration.incarnationId().equals(record.incarnationId())) {
-fenced = prevRegistration.fenced();
-}
 // Update broker registrations.
 brokerRegistrations.put(brokerId, new BrokerRegistration(brokerId,
 record.brokerEpoch(), record.incarnationId(), listeners, features,
-Optional.ofNullable(record.rack()), fenced));
-
+Optional.ofNullable(record.rack()), record.fenced()));
+BrokerRegistration prevRegistration = 
brokerRegistrations.get(brokerId);

Review comment:
   Since we are adding `fenced` to the RegisterBrokerRecord, do we also 
need to add a `fenced` field to the BrokerRegistrationRequest RPC? Or is it the 
case that only the controller will set the fenced state of this record




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10886: MINOR: TestSecurityRollingUpgrade system test fixes

2021-06-15 Thread GitBox


rondagostino commented on pull request #10886:
URL: https://github.com/apache/kafka/pull/10886#issuecomment-861682744


   I've kicked off system tests for the affected file and will post results 
here when it completes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] IgnacioAcunaF commented on a change in pull request #10883: KAFKA-12949: TestRaftServer's scala.MatchError: null on test-kraft-server-start.sh

2021-06-15 Thread GitBox


IgnacioAcunaF commented on a change in pull request #10883:
URL: https://github.com/apache/kafka/pull/10883#discussion_r651993936



##
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##
@@ -238,6 +238,8 @@ class TestRaftServer(
   reader.close()
 
 case Shutdown => // Ignore shutdown command
+
+case _ => // Ignore other events (such as null)

Review comment:
   Exactly. When when eventQueue hits the eventTimeoutMs the `null` value 
arises (as there is nothing to poll from). When debugging encounter that 
`eventTimeoutMs ` could be 0 (if the throttleTimeMs is equal to 0), so if there 
is no inmediate events on the eventQueue, the `null` arises inmediatly.
   
   And yes, at the beggining I approached it with a `case null => // Ignore 
null event`, which made the class run just fine, but let it generic to catch 
some events that may be being overlooked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10886: MINOR: TestSecurityRollingUpgrade system test fixes

2021-06-15 Thread GitBox


rondagostino opened a new pull request #10886:
URL: https://github.com/apache/kafka/pull/10886


   The `TestSecurityRollingUpgrade. 
test_disable_separate_interbroker_listener()` system test had a design flaw: it 
was migrating inter-broker communication from a SASL_SSL listener to an SSL 
listener in one roll while immediately removing the SASL_SSL listener in that 
roll.  This requires two rolls because the existing SASL_SSL listener must 
remain available throughout the first roll so that unrolled brokers can 
continue to communicate with rolled brokers throughout.  This patch adds the 
second roll to this test and removes the original SASL_SSL listener on that 
second roll instead of the first one.  The test was not failing all the time -- 
it was flaky.
   
   The `TestSecurityRollingUpgrade.test_rolling_upgrade_phase_two()` system 
test was not explicitly identifying the SASL mechanism to enable on a third 
port when that port was using SASL but the client security protocol was not 
SASL-based.  This was resulting in an empty `sasl.enabled.mechanisms` config, 
which applied to that third port, and then when the cluster was rolled to take 
advantage of this third port for inter-broker communication the potential for 
an inability to communicate with other, unrolled brokers existed (similar to 
above, this resulted in a flaky test).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12952) Metadata Snapshot File Header

2021-06-15 Thread Niket Goel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Niket Goel updated KAFKA-12952:
---
Description: 
Create a new Control Record that will serve as the header for a Metadata 
Snapshot File. This header will be contained at the beginning of each Snapshot 
File.

Add the following fields to the header record:
 # *Version :* Schema version for the snapshot header
 # *End Offset* : End offset of the snapshot from the snapshot ID
 # *Epoch :* Epoch of the snapshot ** from the Snapshot ID
 # *Snapshot Size* : Size in bytes of the Snapshot File.
 # *Creator ID* : (Optional) ID of the broker/Controller that created the 
snapshot
 # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
 # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
each record batch has a timestamp already.
 # *HasFooter :* Boolean indicating if a control record/batch will be included 
at the end of the file

 

  was:
Create a new Control Record that will serve as the header for a Metadata 
Snapshot File. This header will be contained at the beginning of each Snapshot 
File.

Add the following fields to the header record:
 # *Version :* Schema version for the snapshot header
 # *End Offset* : End offset of the snapshot from the snapshot ID
 # *Epoch :* Epoch of the snapshot ** from the Snapshot ID
 # *Creator ID* : (Optional) ID of the broker/Controller that created the 
snapshot
 # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
 # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
each record batch has a timestamp already)
 # *HasFooter :* Boolean indicating if a control record/batch will be included 
at the end of the file

 


> Metadata Snapshot File Header
> -
>
> Key: KAFKA-12952
> URL: https://issues.apache.org/jira/browse/KAFKA-12952
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, kraft
>Reporter: Niket Goel
>Priority: Minor
>
> Create a new Control Record that will serve as the header for a Metadata 
> Snapshot File. This header will be contained at the beginning of each 
> Snapshot File.
> Add the following fields to the header record:
>  # *Version :* Schema version for the snapshot header
>  # *End Offset* : End offset of the snapshot from the snapshot ID
>  # *Epoch :* Epoch of the snapshot ** from the Snapshot ID
>  # *Snapshot Size* : Size in bytes of the Snapshot File.
>  # *Creator ID* : (Optional) ID of the broker/Controller that created the 
> snapshot
>  # *Cluster ID :* (Optional) ID of the cluster that created the snapshot
>  # *Create Time :* Timestamp of the snapshot creation (might not be needed as 
> each record batch has a timestamp already.
>  # *HasFooter :* Boolean indicating if a control record/batch will be 
> included at the end of the file
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-15 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651985270



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -70,6 +71,7 @@ object FetchSession {
   * localLogStartOffset is the log start offset of the partition on this 
broker.
   */
 class CachedPartition(val topic: String,
+  val topicId: Uuid,

Review comment:
   I thought about this, and originally we compared the topic IDs in the 
session by grabbing cached partitions and comparing to the IDs in the request. 
Since we have a new mechanism (the topic ID map) we may no longer need to do 
this and I can add the ID to the hashcode and equals methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10861: KAFKA-12909: disable spurious left/outer stream-stream join fix for old JoinWindows API

2021-06-15 Thread GitBox


guozhangwang commented on a change in pull request #10861:
URL: https://github.com/apache/kafka/pull/10861#discussion_r651983648



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##
@@ -82,20 +87,23 @@
 
 private class KStreamKStreamJoinProcessor extends AbstractProcessor 
{
 private WindowStore otherWindowStore;
-private StreamsMetricsImpl metrics;
 private Sensor droppedRecordsSensor;
 private Optional, LeftOrRightValue>> 
outerJoinWindowStore = Optional.empty();
 
-@SuppressWarnings("unchecked")
 @Override
 public void init(final ProcessorContext context) {
 super.init(context);
-metrics = (StreamsMetricsImpl) context.metrics();
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
 droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
 otherWindowStore = context.getStateStore(otherWindowName);
 
-if 
(StreamsConfig.InternalConfig.getBoolean(context().appConfigs(), 
ENABLE_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, true)) {
-outerJoinWindowStore = outerJoinWindowName.map(name -> 
context.getStateStore(name));
+if (enableSpuriousResultFix

Review comment:
   I'm thinking exactly the opposite :) if we have a bug which would cause 
us to create a state store, checking it twice may actually mask the bug: we 
would end up creating the state store, and then on the second check not getting 
it, so the behavior is still correct, and it'll be hard for us to discover we 
are creating state stores unnecessarily.
   
   If we have a bug and do not create state stores when needed, then we would 
behave in the old way without the fix; the key point here is that, we only have 
one decision point to make, and either that decision is correct or buggy, we 
can get it surfaced quickly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-15 Thread GitBox


jolshan commented on pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#issuecomment-861667736


   I didn't see a branch or PR with the 2.8 version, so I opened my own here: 
https://github.com/apache/kafka/pull/10885
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10885: KAFKA-12701: NPE in MetadataRequest when using topic IDs

2021-06-15 Thread GitBox


jolshan opened a new pull request #10885:
URL: https://github.com/apache/kafka/pull/10885


   cherry-pick of c16711cb8e0d1c03f123e3e9d7e3d810796bf315
   Updated KafkaApisTest to use 2.8 code.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10872: KAFKA-12945: Remove port, host.name and related configs in 3.0

2021-06-15 Thread GitBox


ijuma commented on pull request #10872:
URL: https://github.com/apache/kafka/pull/10872#issuecomment-861667531


   @dajac Most of them pass locally, I am re-running and will investigate more 
the ones that are not flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-15 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651977071



##
File path: core/src/main/scala/kafka/server/FetchSession.scala
##
@@ -239,12 +245,22 @@ class FetchSession(val id: Int,
   // Update the cached partition data based on the request.
   def update(fetchData: FetchSession.REQ_MAP,
  toForget: util.List[TopicPartition],
- reqMetadata: JFetchMetadata): (TL, TL, TL) = synchronized {
+ reqMetadata: JFetchMetadata,
+ topicIds: util.Map[String, Uuid]): (TL, TL, TL, TL) = 
synchronized {
 val added = new TL
 val updated = new TL
 val removed = new TL
+val inconsistentTopicIds = new TL
 fetchData.forEach { (topicPart, reqData) =>
-  val newCachedPart = new CachedPartition(topicPart, reqData)
+  // Get the topic ID on the broker, if it is valid and the topic is new 
to the session, add its ID.
+  // If the topic already existed, check that its ID is consistent.
+  val id = topicIds.getOrDefault(topicPart.topic, Uuid.ZERO_UUID)
+  val newCachedPart = new CachedPartition(topicPart, id, reqData)
+  if (id != Uuid.ZERO_UUID) {
+val prevSessionTopicId = sessionTopicIds.put(topicPart.topic, id)
+if (prevSessionTopicId != null && prevSessionTopicId != id)
+  inconsistentTopicIds.add(topicPart)

Review comment:
   Got it. Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-06-15 Thread GitBox


junrao commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r651976677



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -73,6 +77,22 @@ public FetchSessionHandler(LogContext logContext, int node) {
 private LinkedHashMap sessionPartitions =
 new LinkedHashMap<>(0);
 
+/**
+ * All of the topic ids mapped to topic names for topics which exist in 
the fetch request session.
+ */
+private Map sessionTopicIds = new HashMap<>(0);
+
+/**
+ * All of the topic names mapped to topic ids for topics which exist in 
the fetch request session.
+ */
+private Map sessionTopicNames = new HashMap<>(0);
+
+public Map sessionTopicNames() {
+return sessionTopicNames;
+}
+
+private boolean canUseTopicIds = false;

Review comment:
   Yes, if that makes the client code simpler and more consistent. For 
example, in https://github.com/apache/kafka/pull/9944#discussion_r651933919, 
the client also chooses to let the server handle the closing of the session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >