[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1073197282


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   Yes, I'll open a dedicated PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-17 Thread GitBox


satishd commented on PR #13040:
URL: https://github.com/apache/kafka/pull/13040#issuecomment-1386596236

   @ijuma It looks like the changes that were done in the conversions were 
accidentally dropped in my local repo while doing a few rebases in my trunk. I 
should have checked that before pushing to PR and pinging for review. Sorry 
about that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

2023-01-17 Thread GitBox


vcrfxia opened a new pull request, #13126:
URL: https://github.com/apache/kafka/pull/13126

   
[KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores)
 proposed the introduction of versioned key-value stores, as well as a 
RocksDB-based implementation. The RocksDB implementation will consist of a 
"latest value store" for storing the latest record version associated with each 
key, in addition to multiple "segment stores" to store older record versions. 
Within a segment store, multiple record versions for the same key will be 
combined into a single bytes array "value" associated with the key and stored 
to RocksDB. 
   
   This PR introduces the utility class that will be used to manage the value 
format of these segment stores, i.e., how multiple record versions for the same 
key will be combined into a single bytes array "value." Follow-up PRs will 
introduce the versioned store implementation itself (which calls heavily upon 
this utility class).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-17 Thread GitBox


ijuma commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1071365497


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -2104,7 +2105,7 @@ object UnifiedLog extends Logging {
 // (or later snapshots). Otherwise, if there is no snapshot file, then we 
have to rebuild producer state
 // from the first segment.
 if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
-  (producerStateManager.latestSnapshotOffset.isEmpty && 
reloadFromCleanShutdown)) {
+  (producerStateManager.latestSnapshotOffset.asScala.isEmpty && 
reloadFromCleanShutdown)) {
   // To avoid an expensive scan through all of the segments, we take empty 
snapshots from the start of the

Review Comment:
   Can we call !isPresent instead of asScala.isEmpty?



##
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##
@@ -557,7 +558,7 @@ class LogLoaderTest {
   _topicId = None,
   keepPartitionMetadataFile = true)
 
-verify(stateManager).removeStraySnapshots(any[Seq[Long]])
+
verify(stateManager).removeStraySnapshots((any[java.util.List[java.lang.Long]]))

Review Comment:
   Are the extra parenthesis around `any` needed? We have a few similar 
examples in this file.



##
core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala:
##
@@ -340,8 +342,7 @@ class ProducerStateManagerTest {
 // After reloading from the snapshot, the transaction should still be 
considered late
 val reloadedStateManager = new ProducerStateManager(partition, logDir, 
maxTransactionTimeoutMs,
   producerStateManagerConfig, time)
-reloadedStateManager.truncateAndReload(logStartOffset = 0L,
-  logEndOffset = stateManager.mapEndOffset, currentTimeMs = 
time.milliseconds())
+reloadedStateManager.truncateAndReload(0L,stateManager.mapEndOffset, 
time.milliseconds())

Review Comment:
   Nit: space missing after `,`.



##
storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManagerConfig.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class ProducerStateManagerConfig {
+public static final Set RECONFIGURABLE_CONFIGS = 
Collections.singleton("producer.id.expiration.ms");
+private volatile int producerIdExpirationMs;
+
+public ProducerStateManagerConfig(int producerIdExpirationMs) {
+this.producerIdExpirationMs = producerIdExpirationMs;
+}
+
+public void updateProducerIdExpirationMs(int producerIdExpirationMs) {

Review Comment:
   Nit: use `set` instead of `update`?



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -680,20 +680,20 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private[log] def activeProducersWithLastSequence: Map[Long, Int] = lock 
synchronized {
-producerStateManager.activeProducers.map { case (producerId, 
producerIdEntry) =>
-  (producerId, producerIdEntry.lastSeq)
+producerStateManager.activeProducers.asScala.map { case (producerId, 
producerIdEntry) =>
+  (producerId.toLong, producerIdEntry.lastSeq)
 }
-  }
+  }.toMap

Review Comment:
   Can we avoid this `toMap` copy? Same for the case a few lines below.



##
core/src/test/scala/integration/kafka/api/TransactionsTest.scala:
##
@@ -652,7 +652,7 @@ class TransactionsTest extends IntegrationTestHarness {
   producer.commitTransaction()
 
   var producerStateEntry =
-brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 0)).get.producerStateManager.activeProducers.head._2
+brokers(partitionLeader).logManager.getLog(new 
TopicPartition(testTopic, 
0)).get.producerStateManager.activeProducers.asScala.head._2

Review Comment:
   Can we use `.get(0)` instead of `asScala.head`? There's one other similar 
example in this file.



##
core/src/test/scala/unit/kafka/log/LogTestUtils.scala:
##
@@ -247,7 +246,7 @@ object LogTestUtils {
   }
 
   def listProducerSnapshotOffsets(logDir: File): Seq[Long] =
-

[GitHub] [kafka] ijuma commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-17 Thread GitBox


ijuma commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1386277046

   @jolshan From what you're saying, there are some thread safety bugs, but 
their impact is likely minor and hence why we haven't noticed them. From my 
perspective, we really need to document the concurrency model for this class 
more clearly and the class would be more robust if we made it thread safe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied

2023-01-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14302:

Priority: Critical  (was: Major)

> Infinite probing rebalance if a changelog topic got emptied
> ---
>
> Key: KAFKA-14302
> URL: https://issues.apache.org/jira/browse/KAFKA-14302
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Damien Gasparina
>Priority: Critical
> Fix For: 3.5.0
>
> Attachments: image-2022-10-14-12-04-01-190.png, logs.tar.gz2
>
>
> If a store, with a changelog topic, has been fully emptied, it could generate 
> infinite probing rebalance.
>  
> The scenario is the following:
>  * A Kafka Streams application, deployed on many instances, have a store with 
> a changelog
>  * Many entries are pushed into the changelog, thus the Log end Offset is 
> high, let's say 20,000
>  * Then, the store got emptied, either due to data retention (windowing) or 
> tombstone
>  * Then an instance of the application is restarted, and its local disk is 
> deleted (e.g. Kubernetes without Persistent Volume)
>  * After restart, the application restores the store from the changelog, but 
> does not write a checkpoint file as there are no data
>  * As there are no checkpoint entries, this instance specify a taskOffsetSums 
> with offset set to 0 in the subscriptionUserData
>  * The group leader, during the assignment, then compute a lag of 20,000 (end 
> offsets - task offset), which is greater than the default acceptable lag, 
> thus decide to schedule a probing rebalance
>  * In ther next probing rebalance, nothing changed, so... new probing 
> rebalance
>  
> I was able to reproduce locally with a simple topology:
>  
> {code:java}
> var table = streamsBuilder.stream("table");
> streamsBuilder
> .stream("stream")
> .join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
> JoinWindows.of(Duration.ofSeconds(5)))
> .to("output");{code}
>  
>  
>  
> Due to this issue, application having an empty changelog are experiencing 
> frequent rebalance:
> !image-2022-10-14-12-04-01-190.png!
>  
> With assignments similar to:
> {code:java}
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - 
> stream-thread 
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] 
> Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, 
> 0_3, 0_2, 0_1, 0_0] to clients as: 
> d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, 
> 0_1, 0_2, 0_3, 0_4, 0_5])]
> 8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 
> 0_5]) standbyTasks: ([])].{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

2023-01-17 Thread GitBox


jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072903070


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
   // epoch, a sentinel value (NoEpoch) is used and bypass the 
epoch validation.
   if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-  requestLeaderEpoch > currentLeaderEpoch) {
+  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Seems like the concern there was reassignment. I think equal to the leader 
epoch is ok though because if we are a replica for the current leader epoch 
then we can't send a stop replica unless the reassignment was cancelled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

2023-01-17 Thread GitBox


jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072900020


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
   // epoch, a sentinel value (NoEpoch) is used and bypass the 
epoch validation.
   if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-  requestLeaderEpoch > currentLeaderEpoch) {
+  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Looks like this is part of KIP-570. I will take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

2023-01-17 Thread GitBox


jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072895900


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -390,7 +390,7 @@ class ReplicaManager(val config: KafkaConfig,
   // epoch, a sentinel value (NoEpoch) is used and bypass the 
epoch validation.
   if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
   requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
-  requestLeaderEpoch > currentLeaderEpoch) {
+  requestLeaderEpoch >= currentLeaderEpoch) {

Review Comment:
   Just curious -- was the epoch check put in place because we were concerned 
about stale stop replicas? Just trying to figure out why we need it and the 
implications for adding the current epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

2023-01-17 Thread GitBox


jolshan commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072893785


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1971,16 +1971,22 @@ object TestUtils extends Logging {
 )
   }
 
+  def currentIsr(admin: Admin, partition: TopicPartition): Set[Int] = {
+val description = admin.describeTopics(Set(partition.topic).asJava)
+  .allTopicNames
+  .get
+  .asScala

Review Comment:
   nit: can we put a new line here to distinguish the two a bit more?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-17 Thread GitBox


jolshan commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1386151839

   @divijvaidya I'm also a bit confused by this comment:
   > In cases when we have just one (or two) producers, this metric would be 
highly unreliable (not just stale) as it provides an "approximation" of size(). 
It is not un-common to produce data from limited set of producers (with a large 
number of consumers) and hence, I would incline towards sticking to current 
approach of keeping this metric accurate.
   
   We will keep the IDs of any producer used within the last day or so. If 
there are two producers running in a steady state, the metric will stay steady.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-17 Thread GitBox


jolshan commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1386149959

   > Has anyone checked that we always acquire a lock when we call methods like 
activeProducers and isEmpty? I wonder if this class has thread safety bugs.
   
   @ijuma I actually did take a peak when working on KIP-890 and other producer 
ID fun. There is a lock on every usage of active producers (except its usage in 
a log message). 
   
   As for isEmpty, there are two usages (one in UnifiedLog -- to rebuild the 
log and one in LogLoader) and neither is protected via lock.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-17 Thread GitBox


jolshan commented on PR #12972:
URL: https://github.com/apache/kafka/pull/12972#issuecomment-1386129325

   I noticed that we don't consider the stability of responses. (I assume 
though that if the request is unstable, the response is too.) 
   
   Just curious if there are any potential gaps with this approach and the 
reasoning behind only the request stability.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-17 Thread GitBox


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1072868862


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -86,14 +100,7 @@ class DefaultApiVersionManager(
 finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
 finalizedFeatures.epoch,
 controllerApiVersions.orNull,
-listenerType)
-  }
-
-  override def enabledApis: collection.Set[ApiKeys] = {

Review Comment:
   Do we no longer need these methods? Or are they just handled elsewhere? (Ie, 
ApiKeys)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13117: KAFKA-14621 Disallow authorizers during ZK migration

2023-01-17 Thread GitBox


cmccabe commented on PR #13117:
URL: https://github.com/apache/kafka/pull/13117#issuecomment-1386112450

   merged to 3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-17 Thread GitBox


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1072862469


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED

Review Comment:
   You are referring to GROUP_MAX_SIZE_REACHED?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-17 Thread GitBox


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1072861634


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top-level error code, or 0 if there was no error" },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The top-level error message, or null if there was no error." },
+{ "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
+  "about": "The member epoch." },
+{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
+  "about": "True if the member should compute the assignment for the 
group." },
+{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
+  "about": "The heartbeat interval in milliseconds." },
+{ "name": "Assignment", "type": "Assignment", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "null if not provided; the assignment otherwise.", "fields": [
+  { "name": "Error", "type": "int8", "versions": "0+",
+"about": "The assigned error." },
+  { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+"about": "The partitions assigned to the member that can be used 
immediately." },
+  { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+"about": "The partitions assigned to the member that cannot be used 
because they are not released by their former owners yet." },
+  { "name": "MetadataVersion", "type": "int16", "versions": "0+",
+"about": "The version of the metadata." },
+  { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
+"about": "The assigned metadata." }
+]}
+  ],
+  "commonStructs": [

Review Comment:
   See TopicPartitions used in line 52, 54. If we didn't have the common 
struct, we'd have to define the name and the nested fields each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-17 Thread GitBox


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1072860670


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The top-level error code, or 0 if there was no error" },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The top-level error message, or null if there was no error." },
+{ "name": "MemberId", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "The member id generated by the coordinator. Only provided when 
the member joins with MemberEpoch == 0." },
+{ "name": "MemberEpoch", "type": "int32", "versions": "0+",
+  "about": "The member epoch." },
+{ "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
+  "about": "True if the member should compute the assignment for the 
group." },
+{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
+  "about": "The heartbeat interval in milliseconds." },
+{ "name": "Assignment", "type": "Assignment", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
+  "about": "null if not provided; the assignment otherwise.", "fields": [
+  { "name": "Error", "type": "int8", "versions": "0+",
+"about": "The assigned error." },
+  { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+"about": "The partitions assigned to the member that can be used 
immediately." },
+  { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", 
"versions": "0+",
+"about": "The partitions assigned to the member that cannot be used 
because they are not released by their former owners yet." },
+  { "name": "MetadataVersion", "type": "int16", "versions": "0+",
+"about": "The version of the metadata." },
+  { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
+"about": "The assigned metadata." }
+]}
+  ],
+  "commonStructs": [

Review Comment:
   Common structs can be used multiple times when writing the json. It prevents 
repeated text in the fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


jolshan commented on PR #13112:
URL: https://github.com/apache/kafka/pull/13112#issuecomment-1386095725

   Took a first pass. I think the one thing that is tricky is the conversions 
between Java and scala. It may not be avoidable though. I'll take another pass 
soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest

2023-01-17 Thread GitBox


gharris1727 commented on PR #13084:
URL: https://github.com/apache/kafka/pull/13084#issuecomment-1386092163

   Thanks @ashwinpankaj for following up, I think that this is good after one 
last nit comment.
   
   > To test this theory, in my latest revision I have set retry_on_exc to True 
in start_and_wait_to_start_listening().
   I also added a 40 second sleep in 
[RestServer.initializeResources()](https://github.com/ashwinpankaj/kafka/blob/568c443a3ec31fd682133620cb38fdefcfe0b82f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L294).
   ConnectRestApiTests passed inspite of the delay.
   
   I was going to suggest something like this, thanks for verifying that the 
fix actually stabilizes the test!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest

2023-01-17 Thread GitBox


gharris1727 commented on code in PR #13084:
URL: https://github.com/apache/kafka/pull/13084#discussion_r1072856520


##
tests/kafkatest/tests/connect/connect_rest_test.py:
##
@@ -90,7 +90,8 @@ def test_rest_api(self, connect_protocol, metadata_quorum):
 self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
 self.cc.set_external_configs(lambda node: 
self.render("connect-file-external.properties", node=node))
 
-self.cc.start()
+self.logger.info("Waiting till Connect REST server is listening")
+self.cc.start(mode=ConnectServiceBase.STARTUP_MODE_LISTEN)

Review Comment:
   I think this is unnecessary for the stabilization fix, and actually weakens 
the test. Because this is actually creating the connectors in distributed mode, 
I think it would be smart to wait for the cluster to actually join the cluster.
   
   So we can revert these two lines and leave it as it was.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1072855094


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
   && partitionState.deletePartition) {
 val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-  Some(partitionState.leaderEpoch)

Review Comment:
   Ah I see the interface is java. Are we planning to move everything to Java 
though? It's a bit tricky since the GroupCoordinatorAdaptor was done in scala.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1072842347


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -357,8 +354,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 new UpdateMetadataResponse(new 
UpdateMetadataResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
 } else {
   val deletedPartitions = 
replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)
-  if (deletedPartitions.nonEmpty)
-groupCoordinator.handleDeletedPartitions(deletedPartitions, 
requestLocal)
+  if (deletedPartitions.nonEmpty) {
+groupCoordinator.onPartitionsDeleted(deletedPartitions.asJava, 
requestLocal.bufferSupplier)

Review Comment:
   I see in this method we convert back to scala. Is there a way to avoid?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1072840736


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -310,9 +307,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
   && partitionState.deletePartition) {
 val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-  Some(partitionState.leaderEpoch)

Review Comment:
   What was the reasoning for this change? Are we trying to move over to java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1072839205


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##
@@ -511,4 +532,57 @@ class GroupCoordinatorAdapter(
 
 future
   }
+
+  override def partitionFor(groupId: String): Int = {

Review Comment:
   Curious about these overrides -- did the defaults not work and it was ok 
before because we were only using the adapter and not using it as the group 
coordinator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13112: KAFKA-14367; Add internal APIs to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


jolshan commented on code in PR #13112:
URL: https://github.com/apache/kafka/pull/13112#discussion_r1072836630


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -282,8 +283,12 @@ class BrokerServer(
   tokenManager.startup() // does nothing, we just need a token manager in 
order to compile right now...
 
   // Create group coordinator, but don't start it until we've started 
replica manager.
-  // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it 
would be good to fix the underlying issue

Review Comment:
   Did we decide to remove the Streams tests comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #13117: KAFKA-14621 Disallow authorizers during ZK migration

2023-01-17 Thread GitBox


mumrah merged PR #13117:
URL: https://github.com/apache/kafka/pull/13117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-17 Thread GitBox


jeffkbkim commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1072528536


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -619,6 +619,9 @@ object KafkaConfig {
   val PasswordEncoderKeyLengthProp =  "password.encoder.key.length"
   val PasswordEncoderIterationsProp =  "password.encoder.iterations"
 
+  /** Internal Configurations **/
+  val UnreleasedApisEnableProd = "unreleased.apis.enable"

Review Comment:
   should this be Prop?



##
clients/src/main/java/org/apache/kafka/common/protocol/Errors.java:
##
@@ -372,7 +375,10 @@ public enum Errors {
 FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered 
inconsistent topic ID usage", FetchSessionTopicIdException::new),
 INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible 
replica.", IneligibleReplicaException::new),
 NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated 
the partition state but the leader has changed.", 
NewLeaderElectedException::new),
-OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new);
+OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new),
+FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group 
coordinator. The member must abandon all its partitions and rejoins.", 
FencedMemberEpochException::new),
+UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another 
member in the consumer group. That member must leave first.", 
UnreleasedInstanceIdException::new),
+UNSUPPORTED_ASSIGNOR(112, "The assignor used by the member or its version 
range are not supported by the consumer group.", 
UnsupportedAssignorException::new);

Review Comment:
   nit: is not
   how's "The selected assignor or its version range is not supported by the 
consumer group."?



##
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##
@@ -68,14 +68,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
 } finally socket.close()
   }
 
-  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, 
listenerName: ListenerName = cluster.clientListener()): Unit = {
+  def validateApiVersionsResponse(
+apiVersionsResponse: ApiVersionsResponse,
+listenerName: ListenerName = cluster.clientListener(),
+shouldIncludeUnreleasedApi: Boolean = false

Review Comment:
   nit: Apis



##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED

Review Comment:
   i noticed this is missing in the KIP. should we include it?



##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  

[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-17 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17676546#comment-17676546
 ] 

Alexandre Dupriez edited comment on KAFKA-14139 at 1/17/23 8:25 PM:


Hi, [~hachikuji] , thank you for reporting this scenario and the very clear 
description of the issue. Is this something which is still prioritized and are 
you welcoming additional contributors on it?


was (Author: hangleton):
Hi, Jason, thank you for reporting this scenario and the very clear description 
of the issue. Is this something which is still prioritized and are you 
welcoming additional contributors on it?

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes.]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14627) Modernize Connect plugin discovery

2023-01-17 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14627:
---

 Summary: Modernize Connect plugin discovery
 Key: KAFKA-14627
 URL: https://issues.apache.org/jira/browse/KAFKA-14627
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris


https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #12902: KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface

2023-01-17 Thread GitBox


dajac merged PR #12902:
URL: https://github.com/apache/kafka/pull/12902


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

2023-01-17 Thread GitBox


hachikuji commented on code in PR #13107:
URL: https://github.com/apache/kafka/pull/13107#discussion_r1072693482


##
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##
@@ -436,17 +436,22 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int],
   topicPartition: TopicPartition,
   deletePartition: Boolean): Unit = {
-// A sentinel (-2) is used as an epoch if the topic is queued for 
deletion. It overrides
-// any existing epoch.
-val leaderEpoch = metadataInstance.leaderEpoch(topicPartition)
-
 brokerIds.filter(_ >= 0).foreach { brokerId =>
   val result = stopReplicaRequestMap.getOrElseUpdate(brokerId, 
mutable.Map.empty)
-  val alreadyDelete = result.get(topicPartition).exists(_.deletePartition)
+  val updatedDeletePartition = deletePartition || 
result.get(topicPartition).exists(_.deletePartition)
+
+  // A sentinel (-2) is used as an epoch if the replica is to be deleted.
+  // It overrides any existing epoch.
+  val leaderEpoch = if (updatedDeletePartition) {

Review Comment:
   Yeah, `DeletePartition` is set whenever the replica should be deleted, which 
would be the case after cancellation for all adding replicas. It does not 
necessarily imply topic deletion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #12149: KAFKA-13668: Retry upon missing initProducerId due to authorization error

2023-01-17 Thread GitBox


philipnee commented on code in PR #12149:
URL: https://github.com/apache/kafka/pull/12149#discussion_r1072654169


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##
@@ -300,9 +301,13 @@ void runOnce() {
 try {
 transactionManager.maybeResolveSequences();
 
+RuntimeException lastError = transactionManager.lastError();
+if (transactionManager.hasAbortableError() && 
shouldHandleAuthorizationError(lastError)) {
+return;

Review Comment:
   I'm thinking not, because we aren't adding a new producer.  @jolshan 
thoughts?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -155,7 +155,7 @@ private enum State {
 private boolean isTransitionValid(State source, State target) {
 switch (target) {
 case UNINITIALIZED:
-return source == READY;
+return source == READY || source == ABORTABLE_ERROR;

Review Comment:
   hmm good point, I guess upon re-initializing (transition from UNINITIALIZED 
to INITIALIZING state), should we check the previous error to ensure a valid 
transition? Maybe in `initializeTransactions` we examine the previous error and 
make the next transition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #13107: KAFKA-13972; Ensure replica state deleted after reassignment cancellation

2023-01-17 Thread GitBox


hachikuji commented on PR #13107:
URL: https://github.com/apache/kafka/pull/13107#issuecomment-1385840547

   @jolshan @dajac This patch has been updated to loosen the epoch check on the 
broker side. The original approach seemed a little risky in the case a 
reassignment is cancelled and resubmitted. It might be possible for 
a`StopReplica` request corresponding to the cancellation to get ordered after 
the `LeaderAndIsr` for the resubmitted reassignment. With the loosened check, 
that would not be possible since a new reassignment would have a leader epoch 
bump: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L747.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14625) CheckpointFile read and write API consistency

2023-01-17 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677880#comment-17677880
 ] 

Ismael Juma commented on KAFKA-14625:
-

Personally I'm not convinced this needs to be changed.

> CheckpointFile read and write API consistency 
> --
>
> Key: KAFKA-14625
> URL: https://issues.apache.org/jira/browse/KAFKA-14625
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
>
> ` CheckpointFile` has the below read and write APIs, write expects a 
> Collection of items, but read returns a List of elements. It is better to 
> look into these APIs and its usages and see whether consistency can be 
> brought without introducing any extra collection conversions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14190) Corruption of Topic IDs with pre-2.8.0 ZK admin clients

2023-01-17 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677877#comment-17677877
 ] 

Justine Olshan commented on KAFKA-14190:


^ +1 to the points above. 
I think that this was covered in point 1 for recovery, but the ZNode can be 
directly written to to give it the correct (original) topic ID. 
This is of course risky as a poorly formatted ZNode will block any topic 
operations.

You can get the old ZNode via: 
{code:java}
get /brokers/topics/{topic}{code}
and copying the entire output `output`.
In the output, the incorrect topic ID should be replaced with the old one (in 
partition.metadata file) to get `modified_output`.

Then the ZNode can be set via :
{code:java}
set /brokers/topics/{topic} modified_output{code}
Then the controller needs to be bounced via deleteall /controller and the 
affected brokers (ones receiving the log messages) also need a restart.


Alternatively, shutting down the broker and deleting the paritition.metadata 
file will recreate the file with the new ID and allow processing to continue.

> Corruption of Topic IDs with pre-2.8.0 ZK admin clients
> ---
>
> Key: KAFKA-14190
> URL: https://issues.apache.org/jira/browse/KAFKA-14190
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core, zkclient
>Affects Versions: 2.8.0, 3.1.0, 2.8.1, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.2.1
>Reporter: Alexandre Dupriez
>Assignee: Divij Vaidya
>Priority: Major
>
> h3. Scope
> The problem reported below has been verified to occur in Zookeeper mode. It 
> has not been attempted with Kraft controllers, although it is unlikely to be 
> reproduced in Kraft mode given the nature of the issue and clients involved.
> h3. Problem Description
> The ID of a topic is lost when an AdminClient of version < 2.8.0 is used to 
> increase the number of partitions of that topic for a cluster with version >= 
> 2.8.0. This results in the controller re-creating the topic IDs upon restart, 
> eventually conflicting with the topic ID of broker’s {{partition.metadata}} 
> files in the partition directories of the impacted topic, leading to an 
> availability loss of the partitions which do not accept leadership / 
> follower-ship when the topic ID indicated by a {{LeaderAndIsr}} request 
> differ from their own locally cached ID.
> One mitigation post-corruption is to substitute the stale topic ID in the 
> {{partition.metadata}} files with the new topic ID referenced by the 
> controller, or alternatively, delete the {{partition.metadata}} file 
> altogether. This requires a restart of the brokers which are assigned the 
> partitions of the impacted topic.
> h3. Steps to reproduce
> 1. Set-up and launch a two-nodes Kafka cluster in Zookeeper mode.
> 2. Create a topic e.g. via {{kafka-topics.sh}}
> {noformat}
> ./bin/kafka-topics.sh --bootstrap-server :9092 --create --topic myTopic 
> --partitions 2 --replication-factor 2{noformat}
> 3. Capture the topic ID using a 2.8.0+ client.
> {noformat}
> ./kafka/bin/kafka-topics.sh --bootstrap-server :9092 --topic myTopic 
> --describe
> Topic: myTopic TopicId: jKTRaM_TSNqocJeQI2aYOQ PartitionCount: 2 
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: myTopic Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
> Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1{noformat}
> 4. Restart one of the broker. This will make each broker create the 
> {{partition.metadata}} files in the partition directories since it will 
> already have loaded the {{Log}} instance in memory.
> 5. Using a *pre-2.8.0* client library, run the following command.
> {noformat}
> ./kafka/bin/kafka-topics.sh --zookeeper :2181 --alter --topic myTopic 
> --partitions 3{noformat}
> 6. Using a 2.8.0+ client library, describe the topic via Zookeeper and notice 
> the absence of topic ID from the output, where it is otherwise expected.
> {noformat}
> ./kafka/bin/kafka-topics.sh —zookeeper :2181 —describe —topic myTopic
> Topic: myTopic PartitionCount: 3 ReplicationFactor: 2 Configs: 
> Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 0,1
> Topic: myTopic Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
> Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
> 7. Using a 2.8.0+ client library, describe the topic via a broker endpoint 
> and notice the topic ID changed.
> {noformat}
> ./kafka/bin/kafka-topics.sh —bootstrap-server :9093 —describe —topic myTopic
> Topic: myTopic TopicId: nI-JQtPwQwGiylMfm8k13w PartitionCount: 3 
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: myTopic Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
> Topic: myTopic Partition: 1 Leader: 1 Replicas: 0,1 Isr: 1,0
> Topic: myTopic Partition: 2 Leader: 1 Replicas: 1,0 Isr: 1,0{noformat}
> 8. Restart the controller.
> 9. 

[jira] [Commented] (KAFKA-14625) CheckpointFile read and write API consistency

2023-01-17 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677875#comment-17677875
 ] 

Sagar Rao commented on KAFKA-14625:
---

hey [~satish.duggana] , i am assuming you plan to work on this? Nonetheless, I 
took a look at the 2 APIs and the places from where it's been called. The main 
difference that I see is that `CommittedOffsetsFile` functions with a map while 
the other 2 scala classes `OffsetCheckpointFile` and 
`LeaderEpochCheckpointFile` interoperate with java lists and scala sequences. 
So, the main difference is in `CommittedOffsetsFile` which operates with 
entrySet of the `partitionToConsumedOffsets`.  I am thinking if we can maintain 
another list of TopicPartitionOffsets which gets stores the topic/partitions 
and the map `partitionToConsumedOffsets` stores the TopicPartitionOffsets 
object keyed by partition (same key as today). 

 

We can keep updating the list as and when the entries are added/removed in the 
map and when we want to sync, we can pass the List as is. A very crude idea in 
this java program:

 
{code:java}
public class TestEquality {

static class TopicPartitionOffsets {
Integer tp;
Long offset;

public TopicPartitionOffsets(Integer tp, Long offset) {
this.tp = tp;
this.offset = offset;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionOffsets that = (TopicPartitionOffsets) o;
return Objects.equals(tp, that.tp) && Objects.equals(offset, 
that.offset);
}

@Override
public int hashCode() {
return Objects.hash(tp, offset);
}

@Override
public String toString() {
return "TopicPartitionOffsets{" +
"tp=" + tp +
", offset=" + offset +
'}';
}
}

public static void main(String[] args) {
Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
LinkedList topicPartitionOffsets = new 
LinkedList<>();

TopicPartitionOffsets tp1 = new TopicPartitionOffsets(1, 100L);
partitionToConsumedOffsets.put(1, tp1);

topicPartitionOffsets.add(tp1);

System.out.println("partitionToConsumedOffsets:" + 
partitionToConsumedOffsets + ", topicPartitionOffsets:" + 
topicPartitionOffsets);

tp1.offset = 200L;

System.out.println("partitionToConsumedOffsets:" + 
partitionToConsumedOffsets + ", topicPartitionOffsets:" + 
topicPartitionOffsets);

TopicPartitionOffsets tp2 = partitionToConsumedOffsets.get(1);
tp2.offset = 300L;

System.out.println("partitionToConsumedOffsets:" + 
partitionToConsumedOffsets + ", topicPartitionOffsets:" + 
topicPartitionOffsets);

topicPartitionOffsets.remove(tp2);
partitionToConsumedOffsets.remove(1);

System.out.println("partitionToConsumedOffsets:" + 
partitionToConsumedOffsets + ", topicPartitionOffsets:" + 
topicPartitionOffsets);

}

} {code}
I am using a LinkedList here so that removing from the List becomes easier(adds 
extra time complexity though). And also, the 2 updates on the data structures 
should happen in an atomic fashion i.e if one fails the other one doesn't fail. 

> CheckpointFile read and write API consistency 
> --
>
> Key: KAFKA-14625
> URL: https://issues.apache.org/jira/browse/KAFKA-14625
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Priority: Major
>
> ` CheckpointFile` has the below read and write APIs, write expects a 
> Collection of items, but read returns a List of elements. It is better to 
> look into these APIs and its usages and see whether consistency can be 
> brought without introducing any extra collection conversions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13111: KAFKA-14190: Update Zk TopicId from locally stored cache in controller

2023-01-17 Thread GitBox


jolshan commented on PR #13111:
URL: https://github.com/apache/kafka/pull/13111#issuecomment-1385825635

   What you say makes sense Colin. I do think its a bit tricky to make such a 
big code change to support folks using older and deprecated tools. 
   
   I also understand the point of view of the pain this causes though. (It's 
caused me quite a bit of pain!) I am interested to see if there are any other 
options here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang closed pull request #11367: MINOR: Do not copy on range for in-memory shared store in stream stream left/out joins

2023-01-17 Thread GitBox


guozhangwang closed pull request #11367: MINOR: Do not copy on range for 
in-memory shared store in stream stream left/out joins
URL: https://github.com/apache/kafka/pull/11367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-17 Thread GitBox


vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072496029


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+
+private final static long POLL_TIMEOUT_MS = 6;
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+// Visible for testing
+static void execute(String... args) throws Exception {
+if (args.length != 5 && args.length != 6) {
+throw new TerseException("USAGE: java " + 
EndToEndLatency.class.getName()
++ " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
+}
+
+String brokers = args[0];
+String topic = args[1];
+int numMessages = Integer.parseInt(args[2]);
+String acks = args[3];
+int messageSizeBytes = Integer.parseInt(args[4]);
+Optional propertiesFile = args.length > 5 ? 
(Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : 
Optional.empty();
+
+if (!Arrays.asList("1", "all").contains(acks)) {
+throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
+}
+
+try (KafkaConsumer consumer = 
createKafkaConsumer(propertiesFile, brokers);
+ KafkaProducer producer = 
createKafkaProducer(propertiesFile, brokers, acks)) {
+
+if (!consumer.listTopics().containsKey(topic)) {
+createTopic(propertiesFile, brokers, topic);
+}
+setupConsumer(topic, consumer);
+double totalTime = 0.0;
+long[] latencies = new long[numMessages];
+Random random = new Random(0);
+
+for (int i = 0; i < numMessages; i++) {
+byte[] message = randomBytesOfLen(random, messageSizeBytes);
+long begin = System.nanoTime();
+//Send message (of random bytes) synchronously then 
immediately poll for it
+producer.send(new ProducerRecord<>(topic, message)).get();
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+long elapsed = System.nanoTime() - begin;
+
+

[jira] [Commented] (KAFKA-13709) Document exactly-once support for source connectors

2023-01-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17677833#comment-17677833
 ] 

ASF GitHub Bot commented on KAFKA-13709:


C0urante merged PR #478:
URL: https://github.com/apache/kafka-site/pull/478




> Document exactly-once support for source connectors
> ---
>
> Key: KAFKA-13709
> URL: https://issues.apache.org/jira/browse/KAFKA-13709
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Add documentation for the support for exactly-once source connectors 
> introduced in 
> [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors].
>  This includes but is not limited to:
>  * How to safely perform a rolling upgrade to enable exactly-once source 
> support for an existing cluster
>  * Any new APIs that connector authors can/should leverage for their source 
> connectors that need clarification beyond what can be included in a Javadoc 
> (for example, how to know what to return from 
> {{{}SourceConnector::exactlyOnceSupport{}}}, and an example on how to define 
> custom transaction boundaries for a connector)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072418814


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+
+private final static long POLL_TIMEOUT_MS = 6;
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+// Visible for testing
+static void execute(String... args) throws Exception {
+if (args.length != 5 && args.length != 6) {
+throw new TerseException("USAGE: java " + 
EndToEndLatency.class.getName()
++ " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
+}
+
+String brokers = args[0];
+String topic = args[1];
+int numMessages = Integer.parseInt(args[2]);
+String acks = args[3];
+int messageSizeBytes = Integer.parseInt(args[4]);
+Optional propertiesFile = args.length > 5 ? 
(Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : 
Optional.empty();
+
+if (!Arrays.asList("1", "all").contains(acks)) {
+throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
+}
+
+try (KafkaConsumer consumer = 
createKafkaConsumer(propertiesFile, brokers);
+ KafkaProducer producer = 
createKafkaProducer(propertiesFile, brokers, acks)) {
+
+if (!consumer.listTopics().containsKey(topic)) {
+createTopic(propertiesFile, brokers, topic);
+}
+setupConsumer(topic, consumer);
+double totalTime = 0.0;
+long[] latencies = new long[numMessages];
+Random random = new Random(0);
+
+for (int i = 0; i < numMessages; i++) {
+byte[] message = randomBytesOfLen(random, messageSizeBytes);
+long begin = System.nanoTime();
+//Send message (of random bytes) synchronously then 
immediately poll for it
+producer.send(new ProducerRecord<>(topic, message)).get();
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+long elapsed = System.nanoTime() - begin;
+
+

[GitHub] [kafka] fvaleri commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072418814


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+
+private final static long POLL_TIMEOUT_MS = 6;
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+// Visible for testing
+static void execute(String... args) throws Exception {
+if (args.length != 5 && args.length != 6) {
+throw new TerseException("USAGE: java " + 
EndToEndLatency.class.getName()
++ " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
+}
+
+String brokers = args[0];
+String topic = args[1];
+int numMessages = Integer.parseInt(args[2]);
+String acks = args[3];
+int messageSizeBytes = Integer.parseInt(args[4]);
+Optional propertiesFile = args.length > 5 ? 
(Utils.isBlank(args[5]) ? Optional.empty() : Optional.of(args[5])) : 
Optional.empty();
+
+if (!Arrays.asList("1", "all").contains(acks)) {
+throw new IllegalArgumentException("Latency testing requires 
synchronous acknowledgement. Please use 1 or all");
+}
+
+try (KafkaConsumer consumer = 
createKafkaConsumer(propertiesFile, brokers);
+ KafkaProducer producer = 
createKafkaProducer(propertiesFile, brokers, acks)) {
+
+if (!consumer.listTopics().containsKey(topic)) {
+createTopic(propertiesFile, brokers, topic);
+}
+setupConsumer(topic, consumer);
+double totalTime = 0.0;
+long[] latencies = new long[numMessages];
+Random random = new Random(0);
+
+for (int i = 0; i < numMessages; i++) {
+byte[] message = randomBytesOfLen(random, messageSizeBytes);
+long begin = System.nanoTime();
+//Send message (of random bytes) synchronously then 
immediately poll for it
+producer.send(new ProducerRecord<>(topic, message)).get();
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+long elapsed = System.nanoTime() - begin;
+
+

[GitHub] [kafka] vamossagar12 commented on pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2023-01-17 Thread GitBox


vamossagar12 commented on PR #12802:
URL: https://github.com/apache/kafka/pull/12802#issuecomment-1385618036

   hey @C0urante bumping this one. Plz review whenever you get the chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-17 Thread GitBox


vamossagar12 commented on code in PR #13095:
URL: https://github.com/apache/kafka/pull/13095#discussion_r1072366317


##
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * This class records the average end to end latency for a single message to 
travel through Kafka
+ *
+ * broker_list = location of the bootstrap broker for both the producer and 
the consumer
+ * num_messages = # messages to send
+ * producer_acks = See ProducerConfig.ACKS_DOC
+ * message_size_bytes = size of each message in bytes
+ *
+ * e.g. [localhost:9092 test 1 1 20]
+ */
+public class EndToEndLatency {
+
+private final static long POLL_TIMEOUT_MS = 6;
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+if (args.length != 5 && args.length != 6) {
+System.err.println("USAGE: java " + 
EndToEndLatency.class.getName()
++ " broker_list topic num_messages producer_acks 
message_size_bytes [optional] properties_file");
+return 1;
+}
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+// Visible for testing
+static void execute(String... args) throws Exception {
+String brokers = args[0];
+String topic = args[1];
+int numMessages = Integer.parseInt(args[2]);
+String acks = args[3];
+int messageSizeBytes = Integer.parseInt(args[4]);
+String propertiesFile = args.length > 5 ? args[5] : null;

Review Comment:
   Thanks @ijuma , @fvaleri I made the changes. Let me know how's it looking 
now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-17 Thread GitBox


ijuma commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1385597771

   Has anyone checked that we always acquire a lock when we call methods like 
`activeProducers` and `isEmpty`? I wonder if this class has thread safety bugs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-17 Thread GitBox


ijuma commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1385589558

   @divijvaidya Producer ids don't change often, do I am not sure what you mean 
by:
   
   > In cases when we have just one (or two) producers, this metric would be 
highly unreliable (not just stale) as it provides an "approximation" of size(). 
It is not un-common to produce data from limited set of producers (with a large 
number of consumers) and hence, I would incline towards sticking to current 
approach of keeping this metric accurate.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072326336


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   If it isn't too big of a problem may I have the 2 classes and see how far I 
can go with them at least in this pull request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072305087


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   > If we are completely moving to Java does this mean that CommandLineUtils 
needs to be rewritten first?
   
   Yes, I already created `CommandLineUtils` as part of the `DumpLogSegments` 
migration, which also includes `CommandDefaultOptions`, but I still have some 
work to do before opening the PR. Let me know if you want a separate PR with 
just these 2 shared classes.
   
   That said, I think we will need to have tools->core temporary dependency to 
avoid duplicating critical code like 
`GroupMetadataManager.formatRecordKeyAndValue(record)` and 
`TransactionLog.formatRecordKeyAndValue(record)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] 51n15t9r commented on pull request #12620: KAFKA-14206: upgrade zookeeper version to 3.7.1

2023-01-17 Thread GitBox


51n15t9r commented on PR #12620:
URL: https://github.com/apache/kafka/pull/12620#issuecomment-1385546015

   @ijuma , @showuon - Sorry to bring this up in a closed thread. 
   
   Other than the vulnerabilities, zookeeper 3.6 is also EOL since December 
2022. 
   Would it be useful to spend the efforts for Zookeeper 3.7.1 upgrades in the 
meantime? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072305087


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   > If we are completely moving to Java does this mean that CommandLineUtils 
needs to be rewritten first?
   
   Yes, I already created `CommandLineUtils` as part of the `DumpLogSegments` 
migration, which also includes `CommandDefaultOptions`, but I still have some 
work to do before opening the PR. Let me know if you want a separate PR with 
just these 3 shared classes.
   
   That said, I think we will need to have tools->core temporary dependency to 
avoid duplicating critical code like 
`GroupMetadataManager.formatRecordKeyAndValue(record)` and 
`TransactionLog.formatRecordKeyAndValue(record)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072271288


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   Sorry, then may I get a bit of clarification on how do you envision this to 
work? Do you envision that a Java compatible copy of `CommandDefaultOptions` is 
written? I am asking this because I was battling with moving ConsoleConsumer 
and there I ran into the following problem:
   ```
   abstract class CommandDefaultOptions(val args: Array[String], 
allowCommandOptionAbbreviation: Boolean = false) {
 val parser = new OptionParser(allowCommandOptionAbbreviation)
   ...
   ```
   ```
   private static class ConsumerConfig extends CommandDefaultOptions {
   ConsumerConfig(String... args) {
   super(args, false)
   
   ... = this.parser(); <--- Cannot access joptsimple.OptionParser
   ```
   Is it in general that we are only trying to move the commands from Scala to 
Java or do we want a complete break from Scala classes? If we are completely 
moving to Java does this mean that `CommandLineUtils` needs to be rewritten 
first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072271288


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   Sorry, then may I get a bit of clarification on how do you envision this to 
work? Do you envision that a Java compatible copy of `CommandDefaultOptions` is 
written? I am asking this because I was battling with moving ConsoleConsumer 
and there I ran into the following problem:
   ```
   abstract class CommandDefaultOptions(val args: Array[String], 
allowCommandOptionAbbreviation: Boolean = false) {
 val parser = new OptionParser(allowCommandOptionAbbreviation)
   ...
   ```
   ```
   private static class ConsumerConfig extends CommandDefaultOptions {
   ConsumerConfig(String... args) {
   super(args, false)
   
   ... = this.parser(); <--- Cannot access joptsimple.OptionParser
   ```
   Is it in general that we are only trying to move the commands from Scala to 
Java or do we want a complete break from Scala classes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


mimaison commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072229304


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   We should not change the argument parsing library in this PR. This is likely 
to slightly change the usage and could potentially break existing 
commands/scripts used by users. Yes ideally we should use a single library but 
we can do that after (and it's likely to require a KIP).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-01-17 Thread GitBox


ivanyu commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1072224151


##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(final Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(final String name, final Map 
tags) {
+final String pkg;
+if (klass.getPackage() == null) {
+pkg = "";
+} else {
+pkg = klass.getPackage().getName();
+}
+final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+return explicitMetricName(pkg, simpleName, name, tags);
+}
+
+public final MetricName explicitMetricName(final String group, final 
String typeName,

Review Comment:
   Can be static, yes. Made it so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-01-17 Thread GitBox


ivanyu commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1072223766


##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(final Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(final String name, final Map 
tags) {
+final String pkg;
+if (klass.getPackage() == null) {
+pkg = "";
+} else {
+pkg = klass.getPackage().getName();
+}
+final String simpleName = klass.getSimpleName().replaceAll("\\$$", "");
+return explicitMetricName(pkg, simpleName, name, tags);
+}
+
+public final MetricName explicitMetricName(final String group, final 
String typeName,
+   final String name, final 
Map tags) {
+final StringBuilder nameBuilder = new StringBuilder();
+nameBuilder.append(group);
+nameBuilder.append(":type=");
+nameBuilder.append(typeName);
+
+if (!name.isEmpty()) {
+nameBuilder.append(",name=");
+nameBuilder.append(name);
+}
+
+final String scope = toScope(tags).orElse(null);
+final Optional tagsName = toMBeanName(tags);
+tagsName.ifPresent(s -> nameBuilder.append(",").append(s));
+
+return new MetricName(group, typeName, name, scope, 
nameBuilder.toString());
+}
+
+public final  Gauge newGauge(final String name, final Gauge 
metric, final Map tags) {
+return KafkaYammerMetrics.defaultRegistry().newGauge(metricName(name, 
tags), metric);
+}
+
+public final  Gauge newGauge(final String name, final Gauge 
metric) {
+return newGauge(name, metric, Collections.emptyMap());
+}
+
+public final Meter newMeter(final String name, final String eventType,
+final TimeUnit timeUnit, final Map tags) {
+return KafkaYammerMetrics.defaultRegistry().newMeter(metricName(name, 
tags), eventType, timeUnit);
+}
+
+public final Meter newMeter(final String name, final String eventType,
+final TimeUnit timeUnit) {
+return newMeter(name, eventType, timeUnit, Collections.emptyMap());
+}
+
+public final Meter newMeter(final MetricName metricName, final String 
eventType, final TimeUnit timeUnit) {
+return KafkaYammerMetrics.defaultRegistry().newMeter(metricName, 
eventType, timeUnit);
+}
+
+public final Histogram newHistogram(final String name, final boolean 
biased, final Map tags) {
+return 
KafkaYammerMetrics.defaultRegistry().newHistogram(metricName(name, tags), 
biased);
+}
+
+public final Histogram newHistogram(final String name) {
+return newHistogram(name, true, Collections.emptyMap());
+}
+
+public final Histogram newHistogram(final String name, final boolean 
biased) {
+return newHistogram(name, biased, Collections.emptyMap());
+}
+
+public final Histogram newHistogram(final String name, final Map tags) {
+return newHistogram(name, true, tags);
+}
+
+public final Timer newTimer(final String name, final TimeUnit 
durationUnit, final TimeUnit rateUnit,
+final Map 

[GitHub] [kafka] ivanyu commented on a diff in pull request #13067: KAFKA-14524: Rewrite KafkaMetricsGroup in Java

2023-01-17 Thread GitBox


ivanyu commented on code in PR #13067:
URL: https://github.com/apache/kafka/pull/13067#discussion_r1072223447


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -293,14 +295,14 @@ class Partition(val topicPartition: TopicPartition,
   private var controllerEpoch: Int = KafkaController.InitialControllerEpoch
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
-  private val tags = Map("topic" -> topic, "partition" -> partitionId.toString)
+  private val tags = Map("topic" -> topic, "partition" -> 
partitionId.toString).asJava
 
-  newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 0, tags)
-  newGauge("InSyncReplicasCount", () => if (isLeader) partitionState.isr.size 
else 0, tags)
-  newGauge("UnderMinIsr", () => if (isUnderMinIsr) 1 else 0, tags)
-  newGauge("AtMinIsr", () => if (isAtMinIsr) 1 else 0, tags)
-  newGauge("ReplicasCount", () => if (isLeader) 
assignmentState.replicationFactor else 0, tags)
-  newGauge("LastStableOffsetLag", () => 
log.map(_.lastStableOffsetLag).getOrElse(0), tags)
+  Partition.metricsGroup.newGauge("UnderReplicated", () => if 
(isUnderReplicated) 1 else 0, tags)

Review Comment:
   Removed (by adding `import Partition.metricsGroup`).



##
server-common/src/main/java/org/apache/kafka/server/metrics/KafkaMetricsGroup.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+import org.apache.kafka.common.utils.Sanitizer;
+
+public class KafkaMetricsGroup {
+private final Class klass;
+
+public KafkaMetricsGroup(final Class klass) {
+this.klass = klass;
+}
+
+/**
+ * Creates a new MetricName object for gauges, meters, etc. created for 
this
+ * metrics group.
+ * @param name Descriptive name of the metric.
+ * @param tags Additional attributes which mBean will have.
+ * @return Sanitized metric name object.
+ */
+public MetricName metricName(final String name, final Map 
tags) {
+final String pkg;

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14576) Move ConsoleConsumer to tools

2023-01-17 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov reassigned KAFKA-14576:
-

Assignee: Christo Lolov

> Move ConsoleConsumer to tools
> -
>
> Key: KAFKA-14576
> URL: https://issues.apache.org/jira/browse/KAFKA-14576
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Christo Lolov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-01-17 Thread GitBox


satishd commented on PR #13046:
URL: https://github.com/apache/kafka/pull/13046#issuecomment-1385392475

   Thanks @ijuma for the updated comments. Addressed them with inline and 
update the PR with latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-17 Thread GitBox


satishd commented on PR #13040:
URL: https://github.com/apache/kafka/pull/13040#issuecomment-1385391553

   Thanks @ijuma for the review comments. Addressed them inline and updated the 
PR with the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-17 Thread GitBox


satishd commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1072167191


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -191,7 +192,7 @@ class LogLoader(
 // Reload all snapshots into the ProducerStateManager cache, the 
intermediate ProducerStateManager used
 // during log recovery may have deleted some files without the 
LogLoader.producerStateManager instance witnessing the
 // deletion.
-producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq)
+producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq.map(x 
=> Long.box(x)).asJava)

Review Comment:
   Good catch! Avoided `Seq` conversion by using Collection as arg. We do not 
need these conversions when we move/rewrite LogSegment code, which is planned 
in few weeks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.

2023-01-17 Thread GitBox


satishd commented on code in PR #13040:
URL: https://github.com/apache/kafka/pull/13040#discussion_r1072167191


##
core/src/main/scala/kafka/log/LogLoader.scala:
##
@@ -191,7 +192,7 @@ class LogLoader(
 // Reload all snapshots into the ProducerStateManager cache, the 
intermediate ProducerStateManager used
 // during log recovery may have deleted some files without the 
LogLoader.producerStateManager instance witnessing the
 // deletion.
-producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq)
+producerStateManager.removeStraySnapshots(segments.baseOffsets.toSeq.map(x 
=> Long.box(x)).asJava)

Review Comment:
   Good catch! Avoided `Seq` conversion by using Collection as arg. We do not 
need these conversions when we convert LogSegment, which is planned in few 
weeks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-17 Thread GitBox


divijvaidya commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1385362966

   **Proposal: Using concurrent map for ProducerStateManager.producers 
(currently a mutable.Map)**
   
   **Pros**
   - Simplifies code (prevents future bugs by accidental update to map without 
updating metrics) to implement KIP-847
   - Does not have any locking concerns since calculation of size() in a 
concurrent map does't require locks. [From Java docs for 
`concurrentMap`](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ConcurrentHashMap.html):
   > Bear in mind that the results of aggregate status methods including size, 
isEmpty, and containsValue are typically useful only when a map is not 
undergoing concurrent updates in other threads. Otherwise the results of these 
methods reflect transient states that may be adequate for monitoring or 
estimation purposes, but not for program control.
   
   **Cons**
   - The metrics for number of producers may be inaccurate when multiple 
threads are modifying the map concurrently. `producers` map is modified during 
`truncation` (when the replica is catching up) or when a producer is expired 
due to timeout or on every completed transaction. That means it is modified 
fairly frequently. Hence, possibility of the `producerId` metric displaying 
inaccurate (usually off-by-one since there are at max 2 threads modifying this 
map) information is fairly high.
   
   **My opinion**
   In cases when we have just one (or two) producers, this metric would be 
highly unreliable (not just stale) as it provides an "approximation" of size(). 
It is not un-common to produce data from limited set of producers (with a large 
number of consumers) and hence, I would incline towards sticking to current 
approach of keeping this metric accurate.
   
   Having said that, I don't have a strong opinion here. We can always come 
back and make this metric accurate if we find that inaccuracy is causing 
problems while troubleshooting. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1071392892


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   I think that we should keep using `joptsimple` with the 
`CommandDefaultOptions` abstract class, rather than migrating all commands to 
`argparse4j`. The vast majority of commands do not use `argparse4j`.
   
   Then, the `joptsimple` (current) no-args output is very different:
   
   ```sh
   This tool helps to query log directory usage on the specified brokers.
   Option  Description  
 
   --  ---  
 
   --bootstrap-serverbootstrapping  
 
   --broker-list  The list of brokers to be queried in 
 
 the form "0,1,2". All brokers in 
the
 cluster will be queried if no 
broker
 list is specified  
 
   --command-config   passed to Admin Client.
 
   --describe  Describe the specified log 
directories
 on the specified brokers.  
 
   --help  Print usage information. 
 
   --topic-listThe list of topics to be queried in  
 
 the form "topic1,topic2,topic3". 
All
 topics will be queried if no topic 
 
 list is specified (default: )  
 
   --version   Display Kafka version. 
   ```
   
   This is the new no-args output with `argparse4j`:
   
   ```sh
   usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER 
[--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST]
 [--broker-list BROKER-LIST]
   kafka-log-dirs: error: argument --bootstrap-server is required
   ```
   
   @ijuma @mimaison WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


fvaleri commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1071392892


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   I think that we should keep using `joptsimple` with the 
`CommandDefaultOptions` abstract class, rather than migrating all commands to 
`argparse4j`. The vast majority of commands do not use `argparse4j`.
   
   Then, the `joptsimple` (current) no-args output is very different:
   
   ```sh
   This tool helps to query log directory usage on the specified brokers.
   Option  Description  
 
   --  ---  
 
   --bootstrap-serverbootstrapping  
 
   --broker-list  The list of brokers to be queried in 
 
 the form "0,1,2". All brokers in 
the
 cluster will be queried if no 
broker
 list is specified  
 
   --command-config   passed to Admin Client.
 
   --describe  Describe the specified log 
directories
 on the specified brokers.  
 
   --help  Print usage information. 
 
   --topic-listThe list of topics to be queried in  
 
 the form "topic1,topic2,topic3". 
All
 topics will be queried if no topic 
 
 list is specified (default: )  
 
   --version   Display Kafka version. 
   ```
   
   New output:
   
   ```sh
   usage: kafka-log-dirs [-h] --bootstrap-server BOOTSTRAP-SERVER 
[--command-config COMMAND-CONFIG] [--topic-list TOPIC-LIST]
 [--broker-list BROKER-LIST]
   kafka-log-dirs: error: argument --bootstrap-server is required
   ```
   
   @ijuma @mimaison WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yufeiyan1220 opened a new pull request, #13125: KAFKA-14626 Kafka Consumer Coordinator does not cleanup all metrics

2023-01-17 Thread GitBox


yufeiyan1220 opened a new pull request, #13125:
URL: https://github.com/apache/kafka/pull/13125

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14626) Kafka Consumer Coordinator does not cleanup all metrics

2023-01-17 Thread Feiyan Yu (Jira)
Feiyan Yu created KAFKA-14626:
-

 Summary: Kafka Consumer Coordinator does not cleanup all metrics
 Key: KAFKA-14626
 URL: https://issues.apache.org/jira/browse/KAFKA-14626
 Project: Kafka
  Issue Type: Bug
Reporter: Feiyan Yu


Getting inspired by 
[KAFKA-9306|https://issues.apache.org/jira/browse/KAFKA-9306],  I found that 
there is no logic to remove metrics registered in `ConsumerCoordinatorMetrics` 
when `ConsumerCoordinator` is closed, which might 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072024066


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {
+ArgumentParser parser = ArgumentParsers

Review Comment:
   To be honest, I used `argparse4j` because I looked at 
https://github.com/apache/kafka/pull/13080/files as a reference. I don't have a 
strong opinion either way, but the first output you presented seems nicer to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072022574


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String[] args) throws ExecutionException, 
InterruptedException, IOException {

Review Comment:
   Okay, I will do this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-01-17 Thread GitBox


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1072022248


##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+

Review Comment:
   Yup, this is my bad, I will add it in subsequent commits.



##
tools/src/main/java/org/apache/kafka/tools/LogDirsCommand.java:
##
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.LogDirDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class LogDirsCommand {
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (Throwable e) {

Review Comment:
   Sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13082: MINOR: Clarify docs for Streams config max.warmup.replicas.

2023-01-17 Thread GitBox


lucasbru commented on code in PR #13082:
URL: https://github.com/apache/kafka/pull/13082#discussion_r1072016340


##
docs/streams/developer-guide/config-streams.html:
##
@@ -778,10 +778,21 @@ rack.aware.assignment.tagsmax.warmup.replicas
   
 
-  The maximum number of warmup replicas (extra standbys beyond the 
configured num.standbys) that can be assigned at once for the purpose of keeping
-  the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
-  traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
-  for the reassigned warmups to restore sufficient state for them 
to be transitioned to active tasks. Must be at least 1.
+  
+The maximum number of warmup replicas (extra standbys beyond 
the configured num.standbys) that can be assigned at once for the purpose of 
keeping
+the task available on one instance while it is warming up on 
another instance it has been reassigned to. Used to throttle how much extra 
broker
+traffic and cluster state can be used for high availability. 
Increasing this will allow Streams to warm up more tasks at once, speeding up 
the time
+for the reassigned warmups to restore sufficient state for 
them to be transitioned to active tasks. Must be at least 1.
+  
+  
+Note that one warmup replica corresponds to one Stream Task. 
Furthermore, note that each warmup replica can only be promoted to an active 
Task during

Review Comment:
   @littlehorse-eng You need apache http with a include_module and 
rewrite_module being loaded. Maybe this can help: 
https://github.com/lucasbru/kafka-site-docker



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] keashem commented on pull request #13124: MINOR: Rename testDeadToDeadIllegalTransition to testDeadToDeadTransition in GroupMetadataTest

2023-01-17 Thread GitBox


keashem commented on PR #13124:
URL: https://github.com/apache/kafka/pull/13124#issuecomment-1385052326

   @hachikuji   plz take a look~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] keashem opened a new pull request, #13124: MINOR: Rename testDeadToDeadIllegalTransition to testDeadToDeadTransition in GroupMetadataTest

2023-01-17 Thread GitBox


keashem opened a new pull request, #13124:
URL: https://github.com/apache/kafka/pull/13124

   The Dead GroupState has a valid previous state: Dead, so transition from 
Dead to Dead won't throw IllegalStateException. The unit test 
testDeadToDeadIllegalTransition should be renamed  testDeadToDeadTransition.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org