[GitHub] [kafka] mjsax merged pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-12-03 Thread GitBox


mjsax merged pull request #9614:
URL: https://github.com/apache/kafka/pull/9614


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9667: MINOR: Do not print log4j for memberId required

2020-12-03 Thread GitBox


chia7712 commented on a change in pull request #9667:
URL: https://github.com/apache/kafka/pull/9667#discussion_r535900161



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -465,7 +465,11 @@ boolean joinGroupIfNeeded(final Timer timer) {
 }
 } else {
 final RuntimeException exception = future.exception();
-log.info("Rebalance failed.", exception);
+
+if (!(exception instanceof MemberIdRequiredException)) {

Review comment:
   For another, does it need some comment to explain why 
```MemberIdRequiredException``` is excluded.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -465,7 +465,11 @@ boolean joinGroupIfNeeded(final Timer timer) {
 }
 } else {
 final RuntimeException exception = future.exception();
-log.info("Rebalance failed.", exception);
+
+if (!(exception instanceof MemberIdRequiredException)) {

Review comment:
   Is ```JoinGroupResponseHandler``` a better place to log error? For 
example, the error ```UNKNOWN_MEMBER_ID``` is log twice. 
   
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L605)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-12-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-10017:
---

Assignee: Matthias J. Sax  (was: A. Sophie Blee-Goldman)

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.8.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-12-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243797#comment-17243797
 ] 

Matthias J. Sax commented on KAFKA-2967:


I don't know the details either. But I think it would be a huge improvement 
over HTML!

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #7409: MINOR: Skip conversion to `Struct` when serializing generated requests/responses

2020-12-03 Thread GitBox


ijuma commented on a change in pull request #7409:
URL: https://github.com/apache/kafka/pull/7409#discussion_r535876326



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
##
@@ -84,148 +95,151 @@ protected void updateErrorCounts(Map 
errorCounts, Errors error)
 errorCounts.put(error, count + 1);
 }
 
-protected abstract Struct toStruct(short version);
+protected abstract Message data();
 
 /**
  * Parse a response from the provided buffer. The buffer is expected to 
hold both
  * the {@link ResponseHeader} as well as the response payload.
  */
-public static AbstractResponse parseResponse(ByteBuffer byteBuffer, 
RequestHeader requestHeader) {
+public static AbstractResponse parseResponse(ByteBuffer buffer, 
RequestHeader requestHeader) {
 ApiKeys apiKey = requestHeader.apiKey();
 short apiVersion = requestHeader.apiVersion();
 
-ResponseHeader responseHeader = ResponseHeader.parse(byteBuffer, 
apiKey.responseHeaderVersion(apiVersion));
+ResponseHeader responseHeader = ResponseHeader.parse(buffer, 
apiKey.responseHeaderVersion(apiVersion));
+AbstractResponse response = AbstractResponse.parseResponse(apiKey, 
buffer, apiVersion);
+
+// We correlate after parsing the response to avoid spurious 
correlation errors when receiving malformed
+// responses

Review comment:
   TODO: Handle this better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on pull request #9687: MINOR: Move lock method outside try block

2020-12-03 Thread GitBox


g1geordie commented on pull request #9687:
URL: https://github.com/apache/kafka/pull/9687#issuecomment-738601977


   @chia7712   can you help me to take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie opened a new pull request #9687: MINOR: Move lock method outside try block

2020-12-03 Thread GitBox


g1geordie opened a new pull request #9687:
URL: https://github.com/apache/kafka/pull/9687


   The `Lock.lock` in the try block may cause `Lock.unlock`  throw exception 
when it throw exception .
   I think it's nice to move  outside although` ReentrantLock.lock` impl 
doesn't throw exception.
   
   ```
class X {
  private final ReentrantLock lock = new ReentrantLock();
  // ...
   
  public void m() {
lock.lock();  // block until condition holds
try {
  // ... method body
} finally {
  lock.unlock()
}
  }
}
   ```
   pattern recommended by 
   
https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/locks/ReentrantLock.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-12-03 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r535856935



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorClient;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+
+/**
+ * Tests MM2 replication and failover/failback logic.
+ *
+ * MM2 is configured with active/active replication between two Kafka 
clusters. Tests validate that
+ * records sent to either cluster arrive at the other cluster. Then, a 
consumer group is migrated from
+ * one cluster to the other and back. Tests validate that consumer offsets are 
translated and replicated
+ * between clusters during this failover and failback.
+ */
+@Category(IntegrationTest.class)
+public abstract class MirrorConnectorsIntegrationBaseTest {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
+
+private static final int NUM_RECORDS_PER_PARTITION = 10;
+private static final int NUM_PARTITIONS = 10;
+private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+private static final int CHECKPOINT_DURATION_MS = 20_000;
+private static final int RECORD_CONSUME_DURATION_MS = 20_000;
+private static final int OFFSET_SYNC_DURATION_MS = 30_000;
+private static final int NUM_WORKERS = 3;
+private static final int CONSUMER_POLL_TIMEOUT_MS = 500;
+private static final int BROKER_RESTART_TIMEOUT_MS = 10_000;
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final String PRIMARY_CLUSTER_ALIAS = "primary";
+private static final String BACKUP_CLUSTER_ALIAS = "backup";
+private static final List> 

[jira] [Commented] (KAFKA-10672) Restarting Kafka always takes a lot of time

2020-12-03 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243730#comment-17243730
 ] 

Wenbing Shen commented on KAFKA-10672:
--

* We increased the batch number of one IO read

 * After many tests, the startup speed increased by 50% on average

 * See the attached file for detailed code

> Restarting Kafka always takes a lot of time
> ---
>
> Key: KAFKA-10672
> URL: https://issues.apache.org/jira/browse/KAFKA-10672
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
> Environment: A cluster of 21 Kafka nodes;
> Each node has 12 disks;
> Each node has about 1500 partitions;
> There are approximately 700 leader partitions per node;
> Slow-loading partitions have about 1000 log segments;
>Reporter: Wenbing Shen
>Priority: Major
> Attachments: AbstractIterator.java, AbstractIteratorOfRestart.java, 
> AbstractLegacyRecordBatch.java, ByteBufferLogInputStream.java, 
> DefaultRecordBatch.java, FileLogInputStream.java, FileRecords.java, 
> LazyDownConversionRecords.java, Log.scala, LogInputStream.java, 
> LogManager.scala, LogSegment.scala, MemoryRecords.java, 
> RecordBatchIterator.java, RecordBatchIteratorOfRestart.java, Records.java, 
> server.log
>
>
> When the snapshot file does not exist, or the latest snapshot file before the 
> current active period, restoring the state of producers will traverse the log 
> section, it will traverse the log all batch, in the period when the 
> individual broker node partition number many, that there are most of the 
> number of logs, can cause a lot of IO number, IO will only load one batch at 
> a time, such as a log there will always be in the tens of thousands of batch, 
> I found that in the code for each batch are at least two IO operation, when a 
> batch as the default 16 KB,When a log segment is 1G, 65,536 batches will be 
> generated, and then at least 65,536 *2= 131,072 IO operations will be 
> generated, which will lead to a lot of time spent in kafka startup process. 
> We configured 15 log recovery threads in the production environment, and it 
> still took more than 2 hours to load a partition,can community puts forward 
> some proposals to the situation or improve.For detailed logs, see the section 
> on test-perf-18 partitions in the nearby logs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2020-12-03 Thread GitBox


guozhangwang commented on pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#issuecomment-738579702


   Hey @tombentley Sorry for the late reply! I also checked the source code of 
`ScheduledThreadPoolExecutor` and I think what you've inferred is correct: 
although it did not keep a single thread alive all the time but dynamically 
creates and stops the thread in case there's no tasks scheduled for a long time 
(note we set num.thread == 1), the blocking queue should still guarantee a FIFO 
ordering.
   
   So what's possibly happening is that, the thread handling an leaderISR 
request with the old controller epoch grabs the lock and proceeds first. In 
that case, maybe we can simplify your proposed solution as the following:
   
   * note that, like @hachikuji mentioned, `onLeadershipChange` is actually 
protected by the `replicaStateChangeLock` inside the `becomeLeaderOrFollower`, 
and hence we only need to pass in the controller epoch in 
`groupCoordinator.onElection/onResignation` just like what we did in 
`txnCoordinator`.
   
   * Inside the `groupCoordinator.onElection/onResignation`, which is 
lock-protected, we just remember the latest controller epoch at the 
`GroupCoordinator`, and then we just to one check right before 
`scheduleLoadGroupAndOffsets/removeGroupsForPartition` against that controller 
epoch, if not passed, we can log and skip the loading / unloading function call.
   
   WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10672) Restarting Kafka always takes a lot of time

2020-12-03 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen updated KAFKA-10672:
-
Attachment: Records.java
RecordBatchIteratorOfRestart.java
RecordBatchIterator.java
MemoryRecords.java
LogSegment.scala
LogManager.scala
LogInputStream.java
Log.scala
LazyDownConversionRecords.java
FileRecords.java
FileLogInputStream.java
DefaultRecordBatch.java
ByteBufferLogInputStream.java
AbstractLegacyRecordBatch.java
AbstractIteratorOfRestart.java
AbstractIterator.java

> Restarting Kafka always takes a lot of time
> ---
>
> Key: KAFKA-10672
> URL: https://issues.apache.org/jira/browse/KAFKA-10672
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
> Environment: A cluster of 21 Kafka nodes;
> Each node has 12 disks;
> Each node has about 1500 partitions;
> There are approximately 700 leader partitions per node;
> Slow-loading partitions have about 1000 log segments;
>Reporter: Wenbing Shen
>Priority: Major
> Attachments: AbstractIterator.java, AbstractIteratorOfRestart.java, 
> AbstractLegacyRecordBatch.java, ByteBufferLogInputStream.java, 
> DefaultRecordBatch.java, FileLogInputStream.java, FileRecords.java, 
> LazyDownConversionRecords.java, Log.scala, LogInputStream.java, 
> LogManager.scala, LogSegment.scala, MemoryRecords.java, 
> RecordBatchIterator.java, RecordBatchIteratorOfRestart.java, Records.java, 
> server.log
>
>
> When the snapshot file does not exist, or the latest snapshot file before the 
> current active period, restoring the state of producers will traverse the log 
> section, it will traverse the log all batch, in the period when the 
> individual broker node partition number many, that there are most of the 
> number of logs, can cause a lot of IO number, IO will only load one batch at 
> a time, such as a log there will always be in the tens of thousands of batch, 
> I found that in the code for each batch are at least two IO operation, when a 
> batch as the default 16 KB,When a log segment is 1G, 65,536 batches will be 
> generated, and then at least 65,536 *2= 131,072 IO operations will be 
> generated, which will lead to a lot of time spent in kafka startup process. 
> We configured 15 log recovery threads in the production environment, and it 
> still took more than 2 hours to load a partition,can community puts forward 
> some proposals to the situation or improve.For detailed logs, see the section 
> on test-perf-18 partitions in the nearby logs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-12-03 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r535846145



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorClient;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+
+/**
+ * Tests MM2 replication and failover/failback logic.
+ *
+ * MM2 is configured with active/active replication between two Kafka 
clusters. Tests validate that
+ * records sent to either cluster arrive at the other cluster. Then, a 
consumer group is migrated from
+ * one cluster to the other and back. Tests validate that consumer offsets are 
translated and replicated
+ * between clusters during this failover and failback.
+ */
+@Category(IntegrationTest.class)
+public abstract class MirrorConnectorsIntegrationBaseTest {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
+
+private static final int NUM_RECORDS_PER_PARTITION = 10;
+private static final int NUM_PARTITIONS = 10;
+private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+private static final int CHECKPOINT_DURATION_MS = 20_000;
+private static final int RECORD_CONSUME_DURATION_MS = 20_000;
+private static final int OFFSET_SYNC_DURATION_MS = 30_000;
+private static final int NUM_WORKERS = 3;
+private static final int CONSUMER_POLL_TIMEOUT_MS = 500;
+private static final int BROKER_RESTART_TIMEOUT_MS = 10_000;
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final String PRIMARY_CLUSTER_ALIAS = "primary";
+private static final String BACKUP_CLUSTER_ALIAS = "backup";
+private static final List> 

[GitHub] [kafka] apovzner commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas

2020-12-03 Thread GitBox


apovzner commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r535830690



##
File path: core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
##
@@ -472,6 +512,134 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
 EasyMock.reset(alterResult, describeResult)
   }
 
+  @Test
+  def shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer(): Unit = {
+// when using --bootstrap-server, it should be illegal to alter anything 
that is not a connection quota
+// for ip entities
+val node = new Node(1, "localhost", 9092)
+val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node)
+
+def verifyCommand(entityType: String, alterOpts: String*): Unit = {
+  val opts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+"--entity-type", entityType, "--entity-name", "admin",
+"--alter") ++ alterOpts)
+  val e = intercept[IllegalArgumentException] {
+ConfigCommand.alterConfig(mockAdminClient, opts)
+  }
+  assertTrue(s"Unexpected exception: $e", 
e.getMessage.contains("some_config"))
+}
+
+verifyCommand("ips", "--add-config", 
"connection_creation_rate=1,some_config=10")

Review comment:
   super tiny nit: `verifyCommand` name makes the code a bit confusing 
since the command is expected to fail. What about `verifyCommandFails` or 
`verifyFails`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas

2020-12-03 Thread GitBox


apovzner commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r535829042



##
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##
@@ -259,13 +264,19 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   }
 
   private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): 
Unit = {
-adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),
-  CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, 
updatedRate.toString))
+val initialConnectionCount = connectionCount
+val adminClient = createAdminClient()

Review comment:
   `TestUtils.alterClientQuotas` called before `adminClient.close` can 
throw an exception. Admin extends AutoCloseable, so you can use 
try-with-resources.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config

2020-12-03 Thread GitBox


chia7712 merged pull request #9682:
URL: https://github.com/apache/kafka/pull/9682


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10803) Skip improper dynamic configs while initialization and include the rest correct ones

2020-12-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-10803:
--

Assignee: Prateek Agarwal

> Skip improper dynamic configs while initialization and include the rest 
> correct ones
> 
>
> Key: KAFKA-10803
> URL: https://issues.apache.org/jira/browse/KAFKA-10803
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.6.0, 2.5.1, 2.7.0, 2.8.0
>Reporter: Prateek Agarwal
>Assignee: Prateek Agarwal
>Priority: Major
>
> There is [a 
> bug|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470]
>  in how incorrect dynamic config keys are removed from the original 
> Properties list, resulting in persisting the improper configs in the 
> properties list.
> This eventually results in exception being thrown while parsing the list by 
> [KafkaConfig 
> ctor|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531],
>  resulting in skipping of the complete dynamic list (including the correct 
> ones).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config

2020-12-03 Thread GitBox


chia7712 commented on pull request #9682:
URL: https://github.com/apache/kafka/pull/9682#issuecomment-738549721


   ```
   Build / JDK 11 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```
   
   known flaky. Will merge this to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9679: MINOR: Make Histogram.clear more readable

2020-12-03 Thread GitBox


chia7712 merged pull request #9679:
URL: https://github.com/apache/kafka/pull/9679


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9686: KAFKA-10804 add more subsets and exclude performance tests

2020-12-03 Thread GitBox


chia7712 opened a new pull request #9686:
URL: https://github.com/apache/kafka/pull/9686


   This PR is a part of https://issues.apache.org/jira/browse/KAFKA-10804
   
   The following tasks are included by this PR.
   
   1. add more subsets
   1. not to run all system tests - exclude performance
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10804) Tune travis system tests to avoid timeouts

2020-12-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-10804:
--

Assignee: Chia-Ping Tsai

> Tune travis system tests to avoid timeouts
> --
>
> Key: KAFKA-10804
> URL: https://issues.apache.org/jira/browse/KAFKA-10804
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> Thanks to https://github.com/apache/kafka/pull/9652, we are now running 
> system tests for PRs. However, it looks like we need some tuning because many 
> of the subsets are timing out. For example: 
> https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be 
> a matter of adding more subsets or changing the timeout, but we should 
> probably also consider whether we want to run all system tests or if there is 
> a more useful subset of them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dongjinleekr commented on a change in pull request #8404: KAFKA-10787: Introduce an import order in Java sources

2020-12-03 Thread GitBox


dongjinleekr commented on a change in pull request #8404:
URL: https://github.com/apache/kafka/pull/8404#discussion_r535791013



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -16,21 +16,21 @@
  */
 package org.apache.kafka.clients;
 
-import java.util.HashSet;
-import java.util.Set;
-
-import java.util.stream.Collectors;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.utils.ExponentialBackoff;
 import org.apache.kafka.common.utils.LogContext;
+
 import org.slf4j.Logger;

Review comment:
   @chia7712 Oh, the description I had at first on 2nd April is outdated; 
[After the 
discussion](https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E),
 we concluded that [the following three-group 
ordering](https://issues.apache.org/jira/browse/KAFKA-10787) would be better:
   
   - `kafka`, `org.apache.kafka`
   - `com`, `net`, `org`
   - `java`, `javax`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10636) Bypass log validation for writes to raft log

2020-12-03 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243646#comment-17243646
 ] 

feyman commented on KAFKA-10636:


[~hachikuji]  Cool, thanks for the help!

> Bypass log validation for writes to raft log
> 
>
> Key: KAFKA-10636
> URL: https://issues.apache.org/jira/browse/KAFKA-10636
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: feyman
>Priority: Major
>  Labels: raft-log-management
>
> The raft leader is responsible for creating the records written to the log 
> (including assigning offsets and the epoch), so we can consider bypassing the 
> validation done in `LogValidator`. This lets us skip potentially expensive 
> decompression and the unnecessary recomputation of the CRC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9627: KAFKA-10746: Change to Warn logs when necessary to notify users

2020-12-03 Thread GitBox


showuon commented on pull request #9627:
URL: https://github.com/apache/kafka/pull/9627#issuecomment-738505590


   @abbccdda  @guozhangwang  , could you please help review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10807) AlterConfig should be validated by the target broker

2020-12-03 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10807:
---

 Summary: AlterConfig should be validated by the target broker
 Key: KAFKA-10807
 URL: https://issues.apache.org/jira/browse/KAFKA-10807
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


After forwarding is enabled, AlterConfigs will no longer be sent to the target 
broker. This behavior bypasses some important config change validations, such 
as path existence, static config conflict, or even worse when the target broker 
is offline, the propagated result does not reflect a true applied result. We 
should gather those necessary cases, and decide whether to actually handle the 
AlterConfig request firstly on the target broker, and then forward, in a 
validate-forward-apply path.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10345) Add file-watch based update for trust/key store paths

2020-12-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10345:

Description: With forwarding enabled, per-broker alter-config doesn't go to 
the target broker anymore, we need to have a mechanism to propagate in-place 
file update through file watch.  (was: With forwarding enabled, per-broker 
alter-config doesn't go to the target broker anymore, we need to have a 
mechanism to propagate that update through ZK without affecting other config 
changes.)

> Add file-watch based update for trust/key store paths
> -
>
> Key: KAFKA-10345
> URL: https://issues.apache.org/jira/browse/KAFKA-10345
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.8.0
>
>
> With forwarding enabled, per-broker alter-config doesn't go to the target 
> broker anymore, we need to have a mechanism to propagate in-place file update 
> through file watch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10345) Add file-watch based update for trust/key store paths

2020-12-03 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10345:

Summary: Add file-watch based update for trust/key store paths  (was: Add 
ZK-notification based update for trust/key store paths)

> Add file-watch based update for trust/key store paths
> -
>
> Key: KAFKA-10345
> URL: https://issues.apache.org/jira/browse/KAFKA-10345
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.8.0
>
>
> With forwarding enabled, per-broker alter-config doesn't go to the target 
> broker anymore, we need to have a mechanism to propagate that update through 
> ZK without affecting other config changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] splett2 opened a new pull request #9685: KAFKA-10748: Add IP connection rate throttling metric

2020-12-03 Thread GitBox


splett2 opened a new pull request #9685:
URL: https://github.com/apache/kafka/pull/9685


   This PR adds the IP throttling metric as described in KIP-612.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2020-12-03 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r535743912



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setTopicConfigErrorCode(Errors.NONE.code)
   }
 }
+val topicIds = 
zkClient.getTopicIdsForTopics(results.asScala.map(result => 
result.name()).toSet)

Review comment:
   If going to ZK here is too slow, another option is to provide a callback 
to adminManager.createTopics which can be called after 
createTopicWithAssignment. This callback would add the topic ID to the result. 
The idea is that createTopicWithAssignment (and writeTopicPartitionAssignment) 
would return the topic ID to avoid an extra call to ZK. I wasn't sure which 
option was better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2020-12-03 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r535743912



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1896,6 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setTopicConfigErrorCode(Errors.NONE.code)
   }
 }
+val topicIds = 
zkClient.getTopicIdsForTopics(results.asScala.map(result => 
result.name()).toSet)

Review comment:
   If going to ZK here is too slow, another option is to provide a callback 
to adminManager.createTopics which can be called after 
createTopicWithAssignment. The idea is that createTopicWithAssignment (and 
writeTopicPartitionAssignment) would return the topic ID to avoid an extra call 
to ZK. I wasn't sure which option was better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2020-12-03 Thread GitBox


jolshan opened a new pull request #9684:
URL: https://github.com/apache/kafka/pull/9684


   Updated CreateTopicResponse, DeleteTopicsRequest/Response and added some new 
AdminClient methods and classes. Now the newly created topic ID will be 
returned in CreateTopicsResult and found in TopicAndMetadataConfig, and topics 
can be deleted by supplying topic IDs through deleteTopicsWithIds which will 
return DeleteTopicsWithIdsResult. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9668: MINOR: add test for repartition/source-topic/changelog optimization

2020-12-03 Thread GitBox


ableegoldman commented on a change in pull request #9668:
URL: https://github.com/apache/kafka/pull/9668#discussion_r535724656



##
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##
@@ -463,7 +463,36 @@ public void 
shouldReuseSourceTopicAsChangelogsWithOptimization20() {
 }
 
 @Test
-public void shouldNotReuseSourceTopicAsChangelogsByDefault() {
+public void shouldNotReuseRepartitionTopicAsChangelogs() {
+final String topic = "topic";
+builder.stream(topic).repartition().toTable(Materialized.as("store"));
+final Properties props = StreamsTestUtils.getStreamsConfig("appId");
+props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+final Topology topology = builder.build(props);
+
+final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
+
+assertThat(
+internalTopologyBuilder.buildTopology().storeToChangelogTopic(),
+equalTo(Collections.singletonMap("store", "appId-store-changelog"))
+);
+assertThat(
+internalTopologyBuilder.stateStores().keySet(),
+equalTo(Collections.singleton("store"))
+);
+assertThat(
+
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
+equalTo(true)
+);
+assertThat(
+
internalTopologyBuilder.topicGroups().get(1).stateChangelogTopics.keySet(),
+equalTo(Collections.singleton("appId-store-changelog"))
+);
+}
+
+@Test
+public void shouldNotReuseRepartitionTopicAsChangelogsByDefault() {

Review comment:
   This should still be called 
`shouldNotReuseSourceTopicAsChangelogsByDefault`,  right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9676: KAFKA-10778: Fence appends after write failure

2020-12-03 Thread GitBox


guozhangwang commented on a change in pull request #9676:
URL: https://github.com/apache/kafka/pull/9676#discussion_r535691107



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1219,6 +1219,9 @@ class Log(@volatile private var _dir: File,
   appendInfo.logAppendTime = duplicate.timestamp
   appendInfo.logStartOffset = logStartOffset
 case None =>
+  if (logDirFailureChannel.logDirIsOffline(parentDir)) {
+throw new KafkaStorageException(s"The log dir $parentDir has 
failed.");

Review comment:
   Maybe we can leave a more informative error message here? Sth like "... 
dir has failed due to a previous IO exception", just indicating it is not 
failed because of the current calling trace.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault

2020-12-03 Thread GitBox


guozhangwang merged pull request #9681:
URL: https://github.com/apache/kafka/pull/9681


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10636) Bypass log validation for writes to raft log

2020-12-03 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10636:
--
Labels: raft-log-management  (was: )

> Bypass log validation for writes to raft log
> 
>
> Key: KAFKA-10636
> URL: https://issues.apache.org/jira/browse/KAFKA-10636
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: feyman
>Priority: Major
>  Labels: raft-log-management
>
> The raft leader is responsible for creating the records written to the log 
> (including assigning offsets and the epoch), so we can consider bypassing the 
> validation done in `LogValidator`. This lets us skip potentially expensive 
> decompression and the unnecessary recomputation of the CRC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10806) throwable from user callback on completeFutureAndFireCallbacks can lead to unhandled exceptions

2020-12-03 Thread xiongqi wu (Jira)
xiongqi wu created KAFKA-10806:
--

 Summary: throwable from user callback on 
completeFutureAndFireCallbacks can lead to unhandled exceptions
 Key: KAFKA-10806
 URL: https://issues.apache.org/jira/browse/KAFKA-10806
 Project: Kafka
  Issue Type: Bug
Reporter: xiongqi wu


 When kafka producer tries to complete/abort a batch,  producer invokes user 
callback. However, "completeFutureAndFireCallbacks" only captures exceptions 
from user callback not all throwables.  An uncaught throwable can prevent the 
batch from being freed.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10636) Bypass log validation for writes to raft log

2020-12-03 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243525#comment-17243525
 ] 

Jason Gustafson commented on KAFKA-10636:
-

[~feyman] Sounds good, thanks for picking it up. It might just be a matter of 
taking advantage of the `AppendOrigin` which we added to `Log.appendAsLeader`.

> Bypass log validation for writes to raft log
> 
>
> Key: KAFKA-10636
> URL: https://issues.apache.org/jira/browse/KAFKA-10636
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: feyman
>Priority: Major
>
> The raft leader is responsible for creating the records written to the log 
> (including assigning offsets and the epoch), so we can consider bypassing the 
> validation done in `LogValidator`. This lets us skip potentially expensive 
> decompression and the unnecessary recomputation of the CRC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-12-03 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243518#comment-17243518
 ] 

James Galasyn commented on KAFKA-2967:
--

Thank you for the feedback, [~vvcephei]! I only suggested RTD because that's 
what we do for ksqlDB, but I'm very happy to use the existing infra. I think 
you're right about leveraging the existing job, but I don't know the deets. 
Maybe [~guozhang] or [~mjsax] can weigh in.

So it seems like it's not a huge lift to make this happen. If everybody thinks 
it's worthwhile for me to investigate, I'd like to dive in for a couple of days 
and see what I come up with.

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah merged pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

2020-12-03 Thread GitBox


mumrah merged pull request #9677:
URL: https://github.com/apache/kafka/pull/9677


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

2020-12-03 Thread GitBox


mumrah commented on pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#issuecomment-738303699


   Failed tests are known flaky
   
   https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9677/8/
   
![image](https://user-images.githubusercontent.com/55116/101087722-9c578700-3580-11eb-8f88-604ee09eda8c.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-12-03 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243502#comment-17243502
 ] 

James Galasyn commented on KAFKA-2967:
--

[~mjsax] Very cool, thank you for the tip!

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-12-03 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243503#comment-17243503
 ] 

John Roesler commented on KAFKA-2967:
-

Hi [~JimGalasyn] ,

Thanks for that information!

It's very cool that you were able to convert the site to markdown. Thanks for 
actually proving that it can be done with a reasonable level of effort.

I didn't understand the rationale for using ReadTheDocs. The docs are currently 
hosted as a static site by the ASF for free. If we have markdown, we should be 
able to use Jekyll or Hugo or something to render the markdown into HTML as 
part of the job that currently copies the site over to the static host, right?

It seems like the ASF has some suggestions for how we could do this: 
https://infra.apache.org/project-site.html

Thanks again,

-John

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-12-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243500#comment-17243500
 ] 

Matthias J. Sax commented on KAFKA-2967:


Btw: [https://infra.apache.org/project-site.html] 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #9674: KAFKA-10665: close all kafkaStreams before purgeLocalStreamsState

2020-12-03 Thread GitBox


bbejeck commented on pull request #9674:
URL: https://github.com/apache/kafka/pull/9674#issuecomment-738292594


   Java 11 failed with 
   ```
   > Task :streams:streams-scala:integrationTest FAILED
   05:26:47  org.gradle.internal.remote.internal.ConnectException: Could not 
connect to server [704081bf-4ce4-4a82-ae43-69c7e855ca11 port:38063, 
addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-03 Thread GitBox


ijuma commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535574184



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path tempSnapshotPath;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path tempSnapshotPath,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.tempSnapshotPath = tempSnapshotPath;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!tempSnapshotPath.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file (%s) as 
read-only", tempSnapshotPath));
+}
+
+Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+Files.move(tempSnapshotPath, destination, 
StandardCopyOption.ATOMIC_MOVE);

Review comment:
   Also, I would not rely on reading the code to assume one way or another. 
You'd want to test it too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-03 Thread GitBox


ijuma commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535572033



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path tempSnapshotPath;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path tempSnapshotPath,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.tempSnapshotPath = tempSnapshotPath;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!tempSnapshotPath.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file (%s) as 
read-only", tempSnapshotPath));
+}
+
+Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+Files.move(tempSnapshotPath, destination, 
StandardCopyOption.ATOMIC_MOVE);

Review comment:
   It's not only Windows, NFS also has some restrictions. Why don't you 
want to use the utility method?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9683: MINOR: Fix KTable-KTable foreign-key join example

2020-12-03 Thread GitBox


mjsax commented on pull request #9683:
URL: https://github.com/apache/kafka/pull/9683#issuecomment-738284048


   Merged to `trunk` and cherry-picked to `2.7` and `2.6` branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9683: MINOR: Fix KTable-KTable foreign-key join example

2020-12-03 Thread GitBox


mjsax merged pull request #9683:
URL: https://github.com/apache/kafka/pull/9683


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn opened a new pull request #9683: DOCS-6115: Fix KTable-KTable foreign-key join example

2020-12-03 Thread GitBox


JimGalasyn opened a new pull request #9683:
URL: https://github.com/apache/kafka/pull/9683


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10555) Improve client state machine

2020-12-03 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-10555:
--

Assignee: Walker Carlson

> Improve client state machine
> 
>
> Key: KAFKA-10555
> URL: https://issues.apache.org/jira/browse/KAFKA-10555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Walker Carlson
>Priority: Major
>  Labels: needs-kip
>
> The KafkaStreams client exposes its state to the user for monitoring purpose 
> (ie, RUNNING, REBALANCING etc). The state of the client depends on the 
> state(s) of the internal StreamThreads that have their own states.
> Furthermore, the client state has impact on what the user can do with the 
> client. For example, active task can only be queried in RUNNING state and 
> similar.
> With KIP-671 and KIP-663 we improved error handling capabilities and allow to 
> add/remove stream thread dynamically. We allow adding/removing threads only 
> in RUNNING and REBALANCING state. This puts us in a "weird" position, because 
> if we enter ERROR state (ie, if the last thread dies), we cannot add new 
> threads and longer. However, if we have multiple threads and one dies, we 
> don't enter ERROR state and do allow to recover the thread.
> Before the KIPs the definition of ERROR state was clear, however, with both 
> KIPs it seem that we should revisit its semantics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10805) More useful reporting from travis system tests

2020-12-03 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243461#comment-17243461
 ] 

Jason Gustafson commented on KAFKA-10805:
-

cc [~chia7712]

> More useful reporting from travis system tests
> --
>
> Key: KAFKA-10805
> URL: https://issues.apache.org/jira/browse/KAFKA-10805
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> Inspecting system test output from travis is a bit painful at the moment 
> because you have to check the build logs to find the tests that failed. 
> Additionally, there is no logging available from the workers which is often 
> essential to debug a failure. We should look into how we can improve the 
> build so that the output is more convenient and useful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10805) More useful reporting from travis system tests

2020-12-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10805:
---

 Summary: More useful reporting from travis system tests
 Key: KAFKA-10805
 URL: https://issues.apache.org/jira/browse/KAFKA-10805
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Inspecting system test output from travis is a bit painful at the moment 
because you have to check the build logs to find the tests that failed. 
Additionally, there is no logging available from the workers which is often 
essential to debug a failure. We should look into how we can improve the build 
so that the output is more convenient and useful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10804) Tune travis system tests to avoid timeouts

2020-12-03 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243458#comment-17243458
 ] 

Jason Gustafson commented on KAFKA-10804:
-

cc [~chia7712]

> Tune travis system tests to avoid timeouts
> --
>
> Key: KAFKA-10804
> URL: https://issues.apache.org/jira/browse/KAFKA-10804
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> Thanks to https://github.com/apache/kafka/pull/9652, we are now running 
> system tests for PRs. However, it looks like we need some tuning because many 
> of the subsets are timing out. For example: 
> https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be 
> a matter of adding more subsets or changing the timeout, but we should 
> probably also consider whether we want to run all system tests or if there is 
> a more useful subset of them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10804) Tune travis system tests to avoid timeouts

2020-12-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10804:
---

 Summary: Tune travis system tests to avoid timeouts
 Key: KAFKA-10804
 URL: https://issues.apache.org/jira/browse/KAFKA-10804
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


Thanks to https://github.com/apache/kafka/pull/9652, we are now running system 
tests for PRs. However, it looks like we need some tuning because many of the 
subsets are timing out. For example: 
https://travis-ci.com/github/apache/kafka/jobs/453241933. This might just be a 
matter of adding more subsets or changing the timeout, but we should probably 
also consider whether we want to run all system tests or if there is a more 
useful subset of them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #9276: KAFKA-10473: Add docs on partition size-on-disk, and other log-related metrics

2020-12-03 Thread GitBox


guozhangwang merged pull request #9276:
URL: https://github.com/apache/kafka/pull/9276


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9276: KAFKA-10473: Add docs on partition size-on-disk, and other log-related metrics

2020-12-03 Thread GitBox


guozhangwang commented on a change in pull request #9276:
URL: https://github.com/apache/kafka/pull/9276#discussion_r535511409



##
File path: docs/ops.html
##
@@ -1129,6 +1128,26 @@ Security 
Considerations for Remote Mon
 
kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec
 
   
+  
+Size of a partition on disk (in bytes)
+
kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+)
+The size of a partition on disk, measured in bytes.
+  
+  
+Number of log segments in a partition
+
kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+)
+The number of log segments in a partition.

Review comment:
   I think this is fine :)

##
File path: docs/ops.html
##
@@ -1129,6 +1128,26 @@ Security 
Considerations for Remote Mon
 
kafka.server:type=BrokerTopicMetrics,name=ReassignmentBytesInPerSec
 
   
+  
+Size of a partition on disk (in bytes)
+
kafka.log:type=Log,name=Size,topic=([-.\w]+),partition=([0-9]+)
+The size of a partition on disk, measured in bytes.
+  
+  
+Number of log segments in a partition
+
kafka.log:type=Log,name=NumLogSegments,topic=([-.\w]+),partition=([0-9]+)
+The number of log segments in a partition.
+  
+  
+First offset in a partition
+
kafka.log:type=Log,name=LogStartOffset,topic=([-.\w]+),partition=([0-9]+)
+The first offset in a partition.
+  
+  
+Last offset in a partition
+
kafka.log:type=Log,name=LogEndOffset,topic=([-.\w]+),partition=([0-9]+)
+The last offset in a partition.

Review comment:
   This is the last uncomitted offset: after the local append it would be 
incremented, not waiting for other replicas to ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs

2020-12-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454
 ] 

Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:16 PM:
--

I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here 
|https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]


was (Author: jolshan):
I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here | 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs

2020-12-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454
 ] 

Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:15 PM:
--

I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here| 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]


was (Author: jolshan):
I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here| 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com/]]

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs

2020-12-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454
 ] 

Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:15 PM:
--

I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here| 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]


was (Author: jolshan):
I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here| 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs

2020-12-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454
 ] 

Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:15 PM:
--

I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here | 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]


was (Author: jolshan):
I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here| 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient]

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10774) Support Describe topic using topic IDs

2020-12-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454
 ] 

Justine Olshan edited comment on KAFKA-10774 at 12/3/20, 7:14 PM:
--

I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead [see here| 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com/]]


was (Author: jolshan):
I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com]

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10774) Support Describe topic using topic IDs

2020-12-03 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243454#comment-17243454
 ] 

Justine Olshan commented on KAFKA-10774:


I've updated the KIP to add another method like describeTopics that uses topic 
IDs instead 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers#KIP516:TopicIdentifiers-AdminandKafkaAdminClient|http://example.com]

> Support Describe topic using topic IDs
> --
>
> Key: KAFKA-10774
> URL: https://issues.apache.org/jira/browse/KAFKA-10774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
>
> Similar to KAFKA-10547 which add topic IDs in MetadataResp, we add topic IDs 
> to MetadataReq and can get TopicDesc by topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-12-03 Thread GitBox


mjsax commented on pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#issuecomment-738231066


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535503453



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {

Review comment:
   Expanding on this, the problem in the shutdown thread. When the join 
only waits for alive threads, and to be alive the thread needs to be started. 
   
   So if in between the check and the start thread another thread transitions 
the state to NOT_RUNNING the thread will not join in the shutdown thread. Then 
when it continues it will start as it passed the check and we will have a 
thread running after the client is shutdown.
   
   This would be extremely though race condition to find or reproduce so best 
to just avoid it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2020-12-03 Thread James Galasyn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241113#comment-17241113
 ] 

James Galasyn edited comment on KAFKA-2967 at 12/3/20, 6:53 PM:


Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to 
do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo. There are numerous options for building and hosting, for 
example, ksqlDB uses a ReadTheDocs project with a Basic account ($50/month). 
The ksdqlDB docs have used this model successfully since November 2019, so the 
execution risk is low.

I looked around at other ASF projects to see how they handle docs, and there's 
no prescribed solution. Each has its own approach.
 * *Pulsar*: [https://github.com/apache/pulsar/tree/master/site2] Markdown and 
[https://docusaurus.io/]

 * *Flink*: [https://github.com/apache/flink/tree/master/docs] Markdown and 
Jekyll

 * *Ant*: Raw HTML [https://github.com/apache/ant/tree/master/manual]

 * *Calcite*: Markdown and Jekyll 
[https://github.com/apache/calcite/tree/master/site/_docs]

 * *Cassandra*: RestructuredText and Sphinx 
[https://github.com/apache/cassandra/tree/trunk/doc]

 * *Hadoop*: 
[https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-auth/src/site/markdown/BuildingIt.md]

Also, we could use whatever hosting solution we use currently for 
[https://kafka.apache.org/documentation/].

The big win for the community is getting the docs into markdown, which greatly 
increases the ease of contribution and flexibility of presentation.

 


was (Author: jimgalasyn):
Since I inherited the Streams docs, I've come to appreciate the points made by 
[~gwenshap] and [~ewencp]. I'm proposing a somewhat different solution: migrate 
the HTML-based docs to markdown and host them on ReadTheDocs. We used this 
approach successfully for the [ksqlDB docs|https://docs.ksqldb.io/en/latest/], 
and there's no reason it won't work for the AK docs.

Last week, I took a couple of hours and manually converted the entire AK doc 
set to markdown by using Pandoc: [ak-docs-proto 
repo|https://github.com/confluentinc/ak-docs-proto]. There's some more work to 
do, like fixing headings and updating links, but as a proof-of-concept, it 
works okay.

Once the markdown is cleaned up, we would move it to the docs directory in the 
public AK GitHub repo and set up a ReadTheDocs project with a Basic account 
($50/month). RTD will build from our GH branches and host the site for us.

The ksdqlDB docs have used this model successfully since May 2020, so the 
execution risk is low. [~mjsax], [~vvcephei], and [~mdrogalis], what do you 
think?

 

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>  Labels: documentation
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-12-03 Thread GitBox


lmr3796 commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r535489887



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1243,19 +1244,30 @@ class KafkaApis(val requestChannel: RequestChannel,
   topicResponses
 } else {
   val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
-  val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
+  val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
 if (isInternal(topic)) {
   val topicMetadata = createInternalTopic(topic)
-  if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-  else
-topicMetadata
+  Some(
+if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
+  metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
+else
+  topicMetadata
+  )
+} else if (isFetchAllMetadata) {
+  // KAFKA-10606: If this request is to get metadata for all topics, 
auto topic creation should not be allowed
+  // The special handling is necessary on broker side because 
allowAutoTopicCreation is hard coded to true
+  // for backward compatibility on client side.
+  //
+  // However, in previous versions, UNKNOWN_TOPIC_OR_PARTITION won't 
happen on fetch all metadata,
+  // so, for backward-compatibility, we need to skip these not founds 
during fetch all metadata here.

Review comment:
   Thanks @ijuma , 
   
   To follow the convention in Kafka, I just updated the comments.  Let me know 
if you have there are any other thoughts!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-03 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535482262



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -74,18 +74,19 @@ public void freeze() throws IOException {
 frozen = true;
 
 // Set readonly and ignore the result
-if (!path.toFile().setReadOnly()) {
-throw new IOException(String.format("Unable to set file %s as 
read-only", path));
+if (!tempSnapshotPath.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file (%s) as 
read-only", tempSnapshotPath));
 }
 
-Path destination = Snapshots.moveRename(path, snapshotId);
-Files.move(path, destination, StandardCopyOption.ATOMIC_MOVE);
+Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+Files.move(tempSnapshotPath, destination, 
StandardCopyOption.ATOMIC_MOVE);
 }
 
 @Override
 public void close() throws IOException {
 channel.close();
-Files.deleteIfExists(path);
+// This is a noop if freeze was called before calling close
+Files.deleteIfExists(tempSnapshotPath);

Review comment:
   Yes!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10777) Add additional configuration to control MirrorMaker 2 internal topics naming convention

2020-12-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-10777:
--
Summary: Add additional configuration to control MirrorMaker 2 internal 
topics naming convention  (was: Add additional configuration to control 
MirrorMaker2 internal topics naming convention)

> Add additional configuration to control MirrorMaker 2 internal topics naming 
> convention
> ---
>
> Key: KAFKA-10777
> URL: https://issues.apache.org/jira/browse/KAFKA-10777
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Omnia Ibrahim
>Assignee: Omnia Ibrahim
>Priority: Minor
>  Labels: mirror, mirror-maker, mirrormaker
>
> MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are 
> hardcoded in the source code which makes it hard to run MM2 with any Kafka 
> Cluster that has rules around topic’s naming convention and doesn’t allow 
> auto-creation for topics.
> In this case developers will need to create these internal topics up-front 
> manually and make sure they do follow the cluster rules and guidance for 
> topic creation, so MM2 should have flexibility to let you override the name 
> of internal topics to follow their cluster topic naming convention. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10777) Add additional configuration to control MirrorMaker2 internal topics naming convention

2020-12-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-10777:
--
Summary: Add additional configuration to control MirrorMaker2 internal 
topics naming convention  (was: Add additional configuration to control MM2 
internal topics naming convention)

> Add additional configuration to control MirrorMaker2 internal topics naming 
> convention
> --
>
> Key: KAFKA-10777
> URL: https://issues.apache.org/jira/browse/KAFKA-10777
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Omnia Ibrahim
>Assignee: Omnia Ibrahim
>Priority: Minor
>  Labels: mirror, mirror-maker, mirrormaker
>
> MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are 
> hardcoded in the source code which makes it hard to run MM2 with any Kafka 
> Cluster that has rules around topic’s naming convention and doesn’t allow 
> auto-creation for topics.
> In this case developers will need to create these internal topics up-front 
> manually and make sure they do follow the cluster rules and guidance for 
> topic creation, so MM2 should have flexibility to let you override the name 
> of internal topics to follow their cluster topic naming convention. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10777) Add additional configuration to control MM2 internal topics naming convention

2020-12-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-10777:
-

Assignee: Omnia Ibrahim

> Add additional configuration to control MM2 internal topics naming convention
> -
>
> Key: KAFKA-10777
> URL: https://issues.apache.org/jira/browse/KAFKA-10777
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Omnia Ibrahim
>Assignee: Omnia Ibrahim
>Priority: Minor
>  Labels: mirror, mirror-maker, mirrormaker
>
> MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are 
> hardcoded in the source code which makes it hard to run MM2 with any Kafka 
> Cluster that has rules around topic’s naming convention and doesn’t allow 
> auto-creation for topics.
> In this case developers will need to create these internal topics up-front 
> manually and make sure they do follow the cluster rules and guidance for 
> topic creation, so MM2 should have flexibility to let you override the name 
> of internal topics to follow their cluster topic naming convention. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535477229



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+streamThread.shutdown();
+threads.remove(streamThread);
+
resizeThreadCache(getCacheSizePerThread(threads.size()));
+return Optional.empty();
+}
+}
+}
+}
+return Optional.empty();
+}
 
-final StreamStateListener streamStateListener = new 
StreamStateListener(threadState, globalThreadState);
-if (hasGlobalTopology) {
-globalStreamThread.setStateListener(streamStateListener);
+private int getNextThreadIndex() {
+final HashSet names = new HashSet<>();
+for (final StreamThread streamThread: threads) {
+names.add(streamThread.getName());
 }
-for (final StreamThread thread : threads) {
-thread.setStateListener(streamStateListener);
+final String baseName = clientId + "-StreamThread-";
+for (int i = 0; i < threads.size(); i++) {

Review comment:
   Sure, I missed that suggestion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10777) Add additional configuration to control MM2 internal topics naming convention

2020-12-03 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim updated KAFKA-10777:
--
Labels: mirror mirror-maker mirrormaker  (was: )

> Add additional configuration to control MM2 internal topics naming convention
> -
>
> Key: KAFKA-10777
> URL: https://issues.apache.org/jira/browse/KAFKA-10777
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Omnia Ibrahim
>Priority: Minor
>  Labels: mirror, mirror-maker, mirrormaker
>
> MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are 
> hardcoded in the source code which makes it hard to run MM2 with any Kafka 
> Cluster that has rules around topic’s naming convention and doesn’t allow 
> auto-creation for topics.
> In this case developers will need to create these internal topics up-front 
> manually and make sure they do follow the cluster rules and guidance for 
> topic creation, so MM2 should have flexibility to let you override the name 
> of internal topics to follow their cluster topic naming convention. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9512: KAFKA-10394: generate snapshot

2020-12-03 Thread GitBox


jsancio commented on a change in pull request #9512:
URL: https://github.com/apache/kafka/pull/9512#discussion_r535474040



##
File path: 
raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.snapshot;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.OffsetAndEpoch;
+
+public final class FileRawSnapshotWriter implements RawSnapshotWriter {
+private final Path tempSnapshotPath;
+private final FileChannel channel;
+private final OffsetAndEpoch snapshotId;
+private boolean frozen = false;
+
+private FileRawSnapshotWriter(
+Path tempSnapshotPath,
+FileChannel channel,
+OffsetAndEpoch snapshotId
+) {
+this.tempSnapshotPath = tempSnapshotPath;
+this.channel = channel;
+this.snapshotId = snapshotId;
+}
+
+@Override
+public OffsetAndEpoch snapshotId() {
+return snapshotId;
+}
+
+@Override
+public long sizeInBytes() throws IOException {
+return channel.size();
+}
+
+@Override
+public void append(ByteBuffer buffer) throws IOException {
+if (frozen) {
+throw new IllegalStateException(
+String.format("Append not supported. Snapshot is already 
frozen: id = %s; temp path = %s", snapshotId, tempSnapshotPath)
+);
+}
+
+Utils.writeFully(channel, buffer);
+}
+
+@Override
+public boolean isFrozen() {
+return frozen;
+}
+
+@Override
+public void freeze() throws IOException {
+channel.close();
+frozen = true;
+
+// Set readonly and ignore the result
+if (!tempSnapshotPath.toFile().setReadOnly()) {
+throw new IOException(String.format("Unable to set file (%s) as 
read-only", tempSnapshotPath));
+}
+
+Path destination = Snapshots.moveRename(tempSnapshotPath, snapshotId);
+Files.move(tempSnapshotPath, destination, 
StandardCopyOption.ATOMIC_MOVE);

Review comment:
   I investigated the issue a bit. It look like this utility was introduce 
to replace these lines in `OffsetCheckpoint.scala`:
   ```scala
   // swap new offset checkpoint file with previous one
   if(!temp.renameTo(file)) {
   // renameTo() fails on Windows if the destination file exists.
   file.delete()
   if(!temp.renameTo(file))
   throw new IOException(...)
   }
   ```
   
   Looking at the JDK implementation in Windows, `ATOMIC_MOVE` should work in 
Windows if the target file exists: 
https://github.com/openjdk/jdk/blob/master/src/java.base/windows/classes/sun/nio/fs/WindowsFileCopy.java#L298-L312
   
   In other words, I think we can keep the code as is in this PR.
   
   cc @ijuma 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535471146



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler

Review comment:
   ah good catch. the diff makes that hard to see as it was actually moved 
to a new method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535468574



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {

Review comment:
   Well newThread only syncs the addThread method. There is still the race 
condition between the second check of is running and starting the thread. It 
seems like a bad idea to leave that open as it could cause thread state changes 
when there shouldn't be. Starting the thread is relatively low cost so this 
shouldn't have much impact perf wise.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9677: KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics

2020-12-03 Thread GitBox


hachikuji commented on a change in pull request #9677:
URL: https://github.com/apache/kafka/pull/9677#discussion_r535452252



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1403,7 +1426,9 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   result match {
-case Left(error: Errors) => error match {
+case Left(error: Errors) =>
+  isrChangeListener.markFailed()
+  error match {

Review comment:
   nit: now the block below is misaligned





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10739) Replace EpochEndOffset with automated protocol

2020-12-03 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10739.
-
Resolution: Done

> Replace EpochEndOffset with automated protocol
> --
>
> Key: KAFKA-10739
> URL: https://issues.apache.org/jira/browse/KAFKA-10739
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Follow up of KAFKA-9630. We can avoid extra conversation by using the 
> auto-generated data structure instead of using `EpochEndOffset ` internally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol

2020-12-03 Thread GitBox


dajac merged pull request #9630:
URL: https://github.com/apache/kafka/pull/9630


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9630: KAFKA-10739; Replace EpochEndOffset with automated protocol

2020-12-03 Thread GitBox


dajac commented on pull request #9630:
URL: https://github.com/apache/kafka/pull/9630#issuecomment-738174240


   Test failure is unrelated:
   * Build / JDK 8 / kafka.api.TransactionsTest.testBumpTransactionalEpoch



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config

2020-12-03 Thread GitBox


prat0318 commented on a change in pull request #9682:
URL: https://github.com/apache/kafka/pull/9682#discussion_r535442782



##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -467,7 +467,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 case _: Exception => true
   }
 }
-invalidProps.foreach(props.remove)
+invalidProps.foreach {

Review comment:
   done.

##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -400,6 +400,33 @@ class DynamicBrokerConfigTest {
 newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
 dynamicBrokerConfig.updateBrokerConfig(0, newprops)
   }
+
+  @Test
+  def testImproperConfigsAreRemoved(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+val configs = KafkaConfig(props)
+
+assertEquals(Int.MaxValue, configs.maxConnections)
+assertEquals(1048588, configs.messageMaxBytes)
+
+var newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(, configs.maxConnections)
+assertEquals(, configs.messageMaxBytes)
+
+newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+// Invalid value should be skipped and reassigned as Originals
+assertEquals(Int.MaxValue, configs.maxConnections)

Review comment:
   done.

##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -400,6 +400,33 @@ class DynamicBrokerConfigTest {
 newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
 dynamicBrokerConfig.updateBrokerConfig(0, newprops)
   }
+
+  @Test
+  def testImproperConfigsAreRemoved(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+val configs = KafkaConfig(props)
+
+assertEquals(Int.MaxValue, configs.maxConnections)
+assertEquals(1048588, configs.messageMaxBytes)
+
+var newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(, configs.maxConnections)
+assertEquals(, configs.messageMaxBytes)
+
+newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+// Invalid value should be skipped and reassigned as Originals

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config

2020-12-03 Thread GitBox


chia7712 commented on a change in pull request #9682:
URL: https://github.com/apache/kafka/pull/9682#discussion_r535428840



##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -400,6 +400,33 @@ class DynamicBrokerConfigTest {
 newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
 dynamicBrokerConfig.updateBrokerConfig(0, newprops)
   }
+
+  @Test
+  def testImproperConfigsAreRemoved(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+val configs = KafkaConfig(props)
+
+assertEquals(Int.MaxValue, configs.maxConnections)
+assertEquals(1048588, configs.messageMaxBytes)
+
+var newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(, configs.maxConnections)
+assertEquals(, configs.messageMaxBytes)
+
+newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+// Invalid value should be skipped and reassigned as Originals
+assertEquals(Int.MaxValue, configs.maxConnections)

Review comment:
   Could you replace ```Int.MaxValue``` by ```Defaults.MaxConnections```? 

##
File path: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
##
@@ -467,7 +467,9 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
 case _: Exception => true
   }
 }
-invalidProps.foreach(props.remove)
+invalidProps.foreach {

Review comment:
   How about ```invalidProps.keys.foreach(props.remove)```? it is still one 
line.

##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -400,6 +400,33 @@ class DynamicBrokerConfigTest {
 newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
 dynamicBrokerConfig.updateBrokerConfig(0, newprops)
   }
+
+  @Test
+  def testImproperConfigsAreRemoved(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+val configs = KafkaConfig(props)
+
+assertEquals(Int.MaxValue, configs.maxConnections)
+assertEquals(1048588, configs.messageMaxBytes)
+
+var newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(, configs.maxConnections)
+assertEquals(, configs.messageMaxBytes)
+
+newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+// Invalid value should be skipped and reassigned as Originals

Review comment:
   "Originals" -> "default value"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9675: KAFKA-10794 Replica leader election is too slow in the case of too many partitions

2020-12-03 Thread GitBox


chia7712 commented on pull request #9675:
URL: https://github.com/apache/kafka/pull/9675#issuecomment-738140790


   @lqjack good question!
   
   >  I find the only differences is that controllerContext.allPartitions can 
be invoked once or the number of partition times .
   
   ```controllerContext.allPartitions``` does not return a constant value. It 
create a new collection and the overhead could be high if there are a lot of 
partitions. This PR makes ```controllerContext.allPartitions``` be called only 
once to reduce the cost of getting "all partitions".
   
   > does the patch can resolve the issue ?
   
   @Montyleo It seems to me the optimization of this PR is good enough. 
However, it would be better to show the improvement on your env by this patch.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-12-03 Thread GitBox


cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r535383909



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler

Review comment:
   nit: If it happens that you need to push another commit, could you fix 
the indentation here? Sorry that I haven't noticed this before.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -870,43 +900,75 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 cacheSizePerThread,
 stateDirectory,
 delegatingStateRestoreListener,
-i + 1,
+threadIdx,
 KafkaStreams.this::closeToError,
-this::defaultStreamsUncaughtExceptionHandler
-);
-threads.add(streamThread);
-threadState.put(streamThread.getId(), streamThread.state());
-storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
-}
+streamsUncaughtExceptionHandler
+);
+streamThread.setStateListener(streamStateListener);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new StreamThreadStateStoreProvider(streamThread));
+return streamThread;
+}
 
-ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, 
(metricsConfig, now) ->
-Math.toIntExact(threads.stream().filter(thread -> 
thread.state().isAlive()).count()));
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ * 
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ * 
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (newThread) {
+if (isRunningOrRebalancing()) {
+final int threadIdx = getNextThreadIndex();
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
+resizeThreadCache(cacheSizePerThread);
+final StreamThread streamThread = 
createStreamThread(cacheSizePerThread, threadIdx);
+synchronized (stateLock) {
+if (isRunningOrRebalancing()) {
+streamThread.start();
+return Optional.of(streamThread.getName());
+} else {
+streamThread.shutdown();
+threads.remove(streamThread);
+
resizeThreadCache(getCacheSizePerThread(threads.size()));
+return Optional.empty();
+}
+}
+}
+}
+return Optional.empty();
+}
 
-final StreamStateListener streamStateListener = new 
StreamStateListener(threadState, globalThreadState);
-if (hasGlobalTopology) {
-globalStreamThread.setStateListener(streamStateListener);
+private int getNextThreadIndex() {
+final HashSet names = new HashSet<>();
+for (final StreamThread streamThread: threads) {
+names.add(streamThread.getName());
 }
-for (final StreamThread thread : threads) {
-thread.setStateListener(streamStateListener);
+final String baseName = clientId + "-StreamThread-";
+for (int i = 0; i < threads.size(); i++) {

Review comment:
   Shouldn't that be `int i = 1; i <= threads.size(); i++`? Otherwise, we 
would look up `*-StreamThread-0"` and we would not look up `"*-StreamThread-" + 
threads.size()`.
   
   Could you add some tests that check the correct naming as @mjsax suggested? 

##

[GitHub] [kafka] splett2 commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas

2020-12-03 Thread GitBox


splett2 commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r535397962



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -56,11 +56,12 @@ import scala.collection._
  *  --entity-type users --entity-name  
--entity-type clients --entity-name 
  *  broker: --broker  OR --entity-type brokers 
--entity-name 
  *  broker-logger: --broker-logger  OR --entity-type 
broker-loggers --entity-name 
+ *  ip: --IP  OR --entity-type IPs --entity-name 
  * 
- * --entity-type  --entity-default may be specified in 
place of --entity-type  --entity-name 
- * when describing or altering default configuration for users, clients, or 
brokers, respectively.
- * Alternatively, --user-defaults, --client-defaults, or --broker-defaults may 
be specified in place of
- * --entity-type  --entity-default, respectively.
+ * --entity-type  --entity-default may be specified 
in place of --entity-type  --entity-name 
+ * when describing or altering default configuration for users, clients, 
brokers, or IPs, respectively.
+ * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or 
--IP-defaults may be specified in place of

Review comment:
   @dajac 
   it was originally lower-case in #9386
   after reading the KIP, i realized it was described as being upper case, e.g.
   ```
   Default quota for  will be stored in Zookeeper at /config/IPs/
   Quota for a specific IP address for which quota override is defined will be 
stored in Zookeeper at /config/IPs/
   ```
   and changed it to upper in this PR with the change in
   ```
   core/src/main/scala/kafka/server/DynamicConfigManager.scala
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10803) Skip improper dynamic configs while initialization and include the rest correct ones

2020-12-03 Thread Prateek Agarwal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243320#comment-17243320
 ] 

Prateek Agarwal commented on KAFKA-10803:
-

PR: https://github.com/apache/kafka/pull/9682

> Skip improper dynamic configs while initialization and include the rest 
> correct ones
> 
>
> Key: KAFKA-10803
> URL: https://issues.apache.org/jira/browse/KAFKA-10803
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.6.0, 2.5.1, 2.7.0, 2.8.0
>Reporter: Prateek Agarwal
>Priority: Major
>
> There is [a 
> bug|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470]
>  in how incorrect dynamic config keys are removed from the original 
> Properties list, resulting in persisting the improper configs in the 
> properties list.
> This eventually results in exception being thrown while parsing the list by 
> [KafkaConfig 
> ctor|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531],
>  resulting in skipping of the complete dynamic list (including the correct 
> ones).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas

2020-12-03 Thread GitBox


dajac commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r535388027



##
File path: core/src/main/scala/kafka/server/AdminManager.scala
##
@@ -920,32 +954,49 @@ class AdminManager(val config: KafkaConfig,
 !name.isDefined || !strict
 }
 
-def fromProps(props: Map[String, String]): Map[String, Double] = {
-  props.map { case (key, value) =>
-val doubleValue = try value.toDouble catch {
-  case _: NumberFormatException =>
-throw new IllegalStateException(s"Unexpected client quota 
configuration value: $key -> $value")
-}
-key -> doubleValue
-  }
-}
-
-(userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) =>
+(userEntries ++ clientIdEntries ++ bothEntries).flatMap { case ((u, c), p) 
=>
   val quotaProps = p.asScala.filter { case (key, _) => 
QuotaConfigs.isQuotaConfig(key) }
   if (quotaProps.nonEmpty && matches(userComponent, u) && 
matches(clientIdComponent, c))
 Some(userClientIdToEntity(u, c) -> fromProps(quotaProps))
   else
 None
-}.flatten.toMap
+}.toMap

Review comment:
   Ah. I missed the `flatMap`. That makes sense, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9628: KAFKA-10747: Implement APIs for altering and describing IP connection rate quotas

2020-12-03 Thread GitBox


dajac commented on a change in pull request #9628:
URL: https://github.com/apache/kafka/pull/9628#discussion_r535387504



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -56,11 +56,12 @@ import scala.collection._
  *  --entity-type users --entity-name  
--entity-type clients --entity-name 
  *  broker: --broker  OR --entity-type brokers 
--entity-name 
  *  broker-logger: --broker-logger  OR --entity-type 
broker-loggers --entity-name 
+ *  ip: --IP  OR --entity-type IPs --entity-name 
  * 
- * --entity-type  --entity-default may be specified in 
place of --entity-type  --entity-name 
- * when describing or altering default configuration for users, clients, or 
brokers, respectively.
- * Alternatively, --user-defaults, --client-defaults, or --broker-defaults may 
be specified in place of
- * --entity-type  --entity-default, respectively.
+ * --entity-type  --entity-default may be specified 
in place of --entity-type  --entity-name 
+ * when describing or altering default configuration for users, clients, 
brokers, or IPs, respectively.
+ * Alternatively, --user-defaults, --client-defaults, --broker-defaults, or 
--IP-defaults may be specified in place of

Review comment:
   @splett2 Isn't the ZK path already lower-case?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] prat0318 commented on a change in pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config

2020-12-03 Thread GitBox


prat0318 commented on a change in pull request #9682:
URL: https://github.com/apache/kafka/pull/9682#discussion_r535387403



##
File path: core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
##
@@ -400,6 +400,33 @@ class DynamicBrokerConfigTest {
 newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
 dynamicBrokerConfig.updateBrokerConfig(0, newprops)
   }
+
+  @Test
+  def testImproperConfigsAreRemoved(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+val configs = KafkaConfig(props)
+
+assertEquals(Int.MaxValue, configs.maxConnections)
+assertEquals(1048588, configs.messageMaxBytes)
+
+var newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(, configs.maxConnections)
+assertEquals(, configs.messageMaxBytes)
+
+newProps = new Properties()
+newProps.put(KafkaConfig.MaxConnectionsProp, "INVALID_INT")
+newProps.put(KafkaConfig.MessageMaxBytesProp, "")
+
+configs.dynamicConfig.updateDefaultConfig(newProps)
+// Invalid value should be skipped and reassigned as Originals
+assertEquals(Int.MaxValue, configs.maxConnections)
+// Even if One property is invalid, the below should get correctly updated.
+assertEquals(, configs.messageMaxBytes)

Review comment:
   Current trunk code returns the old value `` for this instead of the 
newly configured ``.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-12-03 Thread GitBox


ning2008wisc commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r535386444



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorClient;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
+
+/**
+ * Tests MM2 replication and failover/failback logic.
+ *
+ * MM2 is configured with active/active replication between two Kafka 
clusters. Tests validate that
+ * records sent to either cluster arrive at the other cluster. Then, a 
consumer group is migrated from
+ * one cluster to the other and back. Tests validate that consumer offsets are 
translated and replicated
+ * between clusters during this failover and failback.
+ */
+@Category(IntegrationTest.class)
+public abstract class MirrorConnectorsIntegrationBaseTest {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
+
+private static final int NUM_RECORDS_PER_PARTITION = 10;
+private static final int NUM_PARTITIONS = 10;
+private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+private static final int CHECKPOINT_DURATION_MS = 20_000;
+private static final int RECORD_CONSUME_DURATION_MS = 20_000;
+private static final int OFFSET_SYNC_DURATION_MS = 30_000;
+private static final int NUM_WORKERS = 3;
+private static final int CONSUMER_POLL_TIMEOUT_MS = 500;
+private static final int BROKER_RESTART_TIMEOUT_MS = 10_000;
+private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 
TimeUnit.SECONDS.toMillis(120); 
+private static final String PRIMARY_CLUSTER_ALIAS = "primary";
+private static final String BACKUP_CLUSTER_ALIAS = "backup";
+private static final List> 

[GitHub] [kafka] prat0318 opened a new pull request #9682: KAFKA-10803: Fix improper removal of bad dynamic config

2020-12-03 Thread GitBox


prat0318 opened a new pull request #9682:
URL: https://github.com/apache/kafka/pull/9682


   There is [a 
bug](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470)
 in how incorrect dynamic config keys are removed from the original Properties 
list, resulting in persisting the improper configs in the properties list.
   
   This eventually results in exception being thrown while parsing the list by 
[KafkaConfig 
ctor](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531),
 resulting in skipping of the complete dynamic list (including the correct 
ones).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-12-03 Thread GitBox


junrao commented on pull request #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-738119273


   @wushujames : This PR is mostly ready. It's just waiting for another 
committer more familiar with the transactional logic to take another look.
   
   @ConcurrencyPractitioner : Would you be able to rebase this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10803) Skip improper dynamic configs while initialization and include the rest correct ones

2020-12-03 Thread Prateek Agarwal (Jira)
Prateek Agarwal created KAFKA-10803:
---

 Summary: Skip improper dynamic configs while initialization and 
include the rest correct ones
 Key: KAFKA-10803
 URL: https://issues.apache.org/jira/browse/KAFKA-10803
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.1, 2.6.0, 1.1.1, 2.7.0, 2.8.0
Reporter: Prateek Agarwal


There is [a 
bug|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L470]
 in how incorrect dynamic config keys are removed from the original Properties 
list, resulting in persisting the improper configs in the properties list.

This eventually results in exception being thrown while parsing the list by 
[KafkaConfig 
ctor|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L531],
 resulting in skipping of the complete dynamic list (including the correct 
ones).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-12-03 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-738115781


   https://github.com/apache/kafka/pull/9224#discussion_r535179359
   
   I think the `try catch` block is needed for several Exception.
   
   ```
   
/kafka/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java:41:
 error: unreported exception IOException; must be caught or declared to be 
thrown
   sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, 
TestUtils.tempFile(), "testCert");
   
   ```
   
   ```
   
/kafka/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java:42:
 error: unreported exception GeneralSecurityException; must be caught or 
declared to be thrown
   sslConfig = TestSslUtils.createSslConfig(false, true, 
Mode.SERVER, TestUtils.tempFile(), "testCert");
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10802) Spurious log message when starting consumers

2020-12-03 Thread Gary Russell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243273#comment-17243273
 ] 

Gary Russell commented on KAFKA-10802:
--

Also

>I am not sure I understand why it's logged at INFO at all, since it's a normal 
>state during initial handshaking.

Consider reducing it to DEBUG (when the exception is MemberIdRequiredException)?

> Spurious log message when starting consumers
> 
>
> Key: KAFKA-10802
> URL: https://issues.apache.org/jira/browse/KAFKA-10802
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.6.0
>Reporter: Mickael Maison
>Priority: Major
>
> Reported by Gary Russell in the [2.6.1 RC3 vote 
> thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E]
> I am seeing this on every consumer start:
> 2020-11-25 13:54:34.858  INFO 42250 --- [ntainer#0-0-C-1] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed.
> org.apache.kafka.common.errors.MemberIdRequiredException: The group member 
> needs to have a valid member id before actually entering a consumer group.
> Due to this change 
> https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468
> I understand what a MemberIdRequiredException is, but the previous (2.6.0) 
> log (with exception.getMessage()) didn't stand out like the new one does 
> because it was all on one line.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7588) Rationalize configurations passed to pluggable APIs

2020-12-03 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17243261#comment-17243261
 ] 

Ismael Juma commented on KAFKA-7588:


Seems like the PR was merged, so this Jira should be closed.

> Rationalize configurations passed to pluggable APIs
> ---
>
> Key: KAFKA-7588
> URL: https://issues.apache.org/jira/browse/KAFKA-7588
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> There are a lot of extensions points both on the client and server sides. 
> Most of these pluggable APIs are configurable but the configurations they 
> receive are not the same.
> For example, Serializers, Deserializers, Partitioners, Assignors, 
> QuotaCallbacks are passed config.originals(). On the other hand LoginModules, 
> PrincipalBuilders and AuthenticationCallbackHandlers are passed 
> config.values().
> In practice, having access to originals() is nice as it allows to use custom 
> configurations by simply adding it to the client/server configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-12-03 Thread GitBox


ijuma commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r535321971



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1243,19 +1244,30 @@ class KafkaApis(val requestChannel: RequestChannel,
   topicResponses
 } else {
   val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
-  val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
+  val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
 if (isInternal(topic)) {
   val topicMetadata = createInternalTopic(topic)
-  if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code)
-metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-  else
-topicMetadata
+  Some(
+if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
+  metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
+else
+  topicMetadata
+  )
+} else if (isFetchAllMetadata) {
+  // KAFKA-10606: If this request is to get metadata for all topics, 
auto topic creation should not be allowed
+  // The special handling is necessary on broker side because 
allowAutoTopicCreation is hard coded to true
+  // for backward compatibility on client side.
+  //
+  // However, in previous versions, UNKNOWN_TOPIC_OR_PARTITION won't 
happen on fetch all metadata,
+  // so, for backward-compatibility, we need to skip these not founds 
during fetch all metadata here.

Review comment:
   We don't usually include JIRA references in the code unless it's a very 
complex issue. I think the comment here could be something like:
   
   "A metadata request for all topics should never result in topic auto 
creation. A topic may be deleted between the creation of `topics` and 
`topicResponses`, so we make sure to always return `None` for this case."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault

2020-12-03 Thread GitBox


cadonna commented on a change in pull request #9681:
URL: https://github.com/apache/kafka/pull/9681#discussion_r535310939



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -136,13 +136,13 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
 final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
 
-// Assert that only active is able to query for a key by default
-assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));

Review comment:
   Last time we forgot to move this line into the `try-catch-clause`.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -136,13 +136,13 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
 final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
 
-// Assert that only active is able to query for a key by default
-assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
 try {
 if (kafkaStreams1IsActive) {
+assertThat(store1.get(key), is(notNullValue()));
 assertThat(store2.get(key), is(nullValue()));
 } else {
 assertThat(store1.get(key), is(nullValue()));
+assertThat(store2.get(key), is(notNullValue()));
 }

Review comment:
   I thought this is easier readable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault

2020-12-03 Thread GitBox


cadonna commented on pull request #9681:
URL: https://github.com/apache/kafka/pull/9681#issuecomment-738054912


   Call for review: @vvcephei @ableegoldman @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #9681: MINOR: Fix flaky test shouldQueryOnlyActivePartitionStoresByDefault

2020-12-03 Thread GitBox


cadonna opened a new pull request #9681:
URL: https://github.com/apache/kafka/pull/9681


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10460) ReplicaListValidator format checking is incomplete

2020-12-03 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10460.

Fix Version/s: 2.8.0
   Resolution: Fixed

> ReplicaListValidator format checking is incomplete
> --
>
> Key: KAFKA-10460
> URL: https://issues.apache.org/jira/browse/KAFKA-10460
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Robin Palotai
>Assignee: Ankit Kumar
>Priority: Minor
> Fix For: 2.8.0
>
>
> See 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
>  . The logic is supposed to accept only two cases:
>  * list of k:v pairs
>  * a single '*'
> But in practice, since the disjunction's second part only checks that the 
> head is '*', the case where a k:v list is headed by a star is also accepted 
> (and then later broker dies at startup, refusing the value).
> This practically happened due to a CruiseControl bug (see 
> [https://github.com/linkedin/cruise-control/issues/1322])
> Observed on 2.4, but seems to be present in HEAD's source as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison merged pull request #9326: KAFKA-10460: ReplicaListValidator format checking is incomplete

2020-12-03 Thread GitBox


mimaison merged pull request #9326:
URL: https://github.com/apache/kafka/pull/9326


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >