Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on code in PR #14788:
URL: https://github.com/apache/kafka/pull/14788#discussion_r1398724150


##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -26,6 +26,7 @@ public class OffsetSpec {
 public static class EarliestSpec extends OffsetSpec { }
 public static class LatestSpec extends OffsetSpec { }
 public static class MaxTimestampSpec extends OffsetSpec { }
+public static class EarliestLocalTimestampSpec extends OffsetSpec { }

Review Comment:
   Can we rename `EarliestLocalTimestampSpec` to `EarliestLocalSpec` similar to 
earliest and latest?



##
clients/src/main/java/org/apache/kafka/clients/admin/OffsetSpec.java:
##
@@ -70,4 +71,8 @@ public static OffsetSpec maxTimestamp() {
 return new MaxTimestampSpec();
 }
 
+public static OffsetSpec earliestLocalTimestamp() {

Review Comment:
   ditto:
   
   `earliestLocalTimestamp` -> `earliestLocal`
   



##
tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java:
##
@@ -281,14 +281,16 @@ private OffsetSpec parseOffsetSpec(String 
listOffsetsTimestamp) throws TerseExce
 return OffsetSpec.latest();
 case "max-timestamp":
 return OffsetSpec.maxTimestamp();
+case "earliest-local-timestamp":

Review Comment:
   ditto:
   
   `earliest-local-timestamp` -> `earliest-local`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on PR #14710:
URL: https://github.com/apache/kafka/pull/14710#issuecomment-1818294807

   Hi @lucasbru - Thank you so much for the time reviewing this PR.  Apologize 
for the unclarity, so I've updated the PR description as well as added some 
comments. Does it explain the intention of the PR better?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15776: Use the FETCH request timeout as the delay timeout for DelayedRemoteFetch [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on PR #14778:
URL: https://github.com/apache/kafka/pull/14778#issuecomment-1818291232

   > Could you please help me understand how this change works with 
fetch.max.wait.ms from a user perspective i.e. what happens when we are 
retrieving data from both local & remote in a single fetch call?
   
   `fetch.max.wait.ms` timeout is applicable only when there is no enough data 
(`fetch.min.bytes`) to respond back to the client. This is a special case where 
we are reading the data from both local and remote, the FETCH request has to 
wait for the tail latency which is a combined latency of reading from both 
local and remote storage. 
   
   Note that we always read from only one remote partition up-to 
`max.partition.fetch.bytes` even-though there is available bandwidth in the 
FETCH response (`fetch.max.bytes`) and the client rotates the partition order 
in the next FETCH request so that next partitions are served.
   
   > Also, wouldn't this change user clients? Asking because prior to this 
change users were expecting a guaranteed response within fetch.max.wait.ms = 
500ms but now they might not receive a response until 40s request.timeout.ms. 
If the user has configured their application timeouts to according to 
fetch.max.wait.ms, this change will break my application.
   
   `fetch.max.wait.ms` doesn't guarantee a response within this timeout. The 
client expires the request only when it exceeds the `request.timeout.ms` of 30 
seconds (default). The time taken to serve the FETCH request can be higher than 
the `fetch.max.wait.ms` due to slow hard-disk, sector errors in disk and so on.
   
   The 
[FetchRequest.json](https://sourcegraph.com/github.com/apache/kafka/-/blob/clients/src/main/resources/common/message/FetchRequest.json)
 doesn't expose the client configured request timeout, so we are using the 
default server request timeout of 30 seconds. Otherwise, we can introduce one 
more config `fetch.remote.max.wait.ms` to define the delay timeout for 
DelayedRemoteFetch requests. We need to decide whether to keep this config in 
the client/server since the server operator may need to tune this config if the 
remote storage degrades and latency to serve the FETCH requests is high.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-19 Thread via GitHub


ijuma commented on PR #14794:
URL: https://github.com/apache/kafka/pull/14794#issuecomment-1818288709

   This needs to be rebased. Note that `AssignmentsManagerTest` has been moved 
to the `server` module (from `core`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Check the help and version options firstly [kafka]

2023-11-19 Thread via GitHub


runom closed pull request #11735: MINOR: Check the help and version options 
firstly
URL: https://github.com/apache/kafka/pull/11735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15854) Move Java classes from kafka.server to the server module

2023-11-19 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-15854.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Move Java classes from kafka.server to the server module
> 
>
> Key: KAFKA-15854
> URL: https://issues.apache.org/jira/browse/KAFKA-15854
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


ijuma merged PR #14796:
URL: https://github.com/apache/kafka/pull/14796


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


ijuma commented on PR #14796:
URL: https://github.com/apache/kafka/pull/14796#issuecomment-1818284953

   I ran the tests a few times and the JDK 8 build passed, the failures for the 
other builds are unrelated:
   
   > Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 21 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testProduceConsumeViaAssign(String).quorum=kraft
   Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testTimeoutMetrics()
   Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest.testBumpTransactionalEpoch(String).quorum=kraft
   Build / JDK 11 and Scala 2.13 / 
kafka.api.SaslPlainPlaintextConsumerTest.testCoordinatorFailover()


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398696969


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
+coordinatorOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, timer);
 closeQuietly(requestManagers, "request managers");
 closeQuietly(networkClientDelegate, "network client delegate");
 closeQuietly(applicationEventProcessor, "application event processor");
 log.debug("Closed the consumer network thread");
 }
+
+void coordinatorOnClose(final Timer timer) {
+if (!requestManagers.coordinatorRequestManager.isPresent())
+return;
+
+connectCoordinator(timer);
+
+List tasks = closingTasks();
+do {
+long currentTimeMs = timer.currentTimeMs();
+connectCoordinator(timer);
+networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+} while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+}
+
+private void connectCoordinator(final Timer timer) {
+while (!coordinatorReady()) {
+findCoordinatorSync(timer);
+}
+}
+
+private boolean coordinatorReady() {
+CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+Optional coordinator = coordinatorRequestManager.coordinator();
+return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+}
+
+private void findCoordinatorSync(final Timer timer) {
+CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+long currentTimeMs = timer.currentTimeMs();
+NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+networkClientDelegate.addAll(request);
+CompletableFuture findCoordinatorRequest = 
request.unsentRequests.get(0).future();
+while (timer.notExpired() && !findCoordinatorRequest.isDone()) {
+networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+timer.update();
+}
+}
+
+private List maybeAutoCommitOnClose() 
{

Review Comment:
   I kept it as list because I think it eliminates the need to check ifPresent 
in the closingTasks().  What it does it gather all of the requests that need to 
be sent during the shutdown from different request managers.  currently we only 
have 1, i.e. the commitRequestManager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398695558


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -192,10 +191,11 @@ static void runAtClose(final Collection> requ
 
 // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
 // all requests have received a response.
-while (timer.notExpired() && 
!requestFutures.stream().allMatch(Future::isDone)) {
-networkClientDelegate.poll(timer.remainingMs(), 
timer.currentTimeMs());
+do {

Review Comment:
   We need to make sure to at least poll the network client once, so switched 
to do {} while



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398693518


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -446,9 +485,11 @@ private void onFailure(final long currentTimeMs,
 handleCoordinatorDisconnect(responseError.exception(), 
currentTimeMs);
 log.debug("Offset fetch failed: {}", responseError.message());
 // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
-if (responseError == COORDINATOR_LOAD_IN_PROGRESS ||
-responseError == Errors.NOT_COORDINATOR) {
+if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
 // re-discover the coordinator and retry
+coordinatorRequestManager.markCoordinatorUnknown("error 
response " + responseError.name(), currentTimeMs);

Review Comment:
   this change was to fix a bug: we should markCoordinatorUnknown on error: 
`NOT_COORDINATOR`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398693518


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -446,9 +485,11 @@ private void onFailure(final long currentTimeMs,
 handleCoordinatorDisconnect(responseError.exception(), 
currentTimeMs);
 log.debug("Offset fetch failed: {}", responseError.message());
 // TODO: should we retry on COORDINATOR_NOT_AVAILABLE as well ?
-if (responseError == COORDINATOR_LOAD_IN_PROGRESS ||
-responseError == Errors.NOT_COORDINATOR) {
+if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
+retry(currentTimeMs);
+} else if (responseError == Errors.NOT_COORDINATOR) {
 // re-discover the coordinator and retry
+coordinatorRequestManager.markCoordinatorUnknown("error 
response " + responseError.name(), currentTimeMs);

Review Comment:
   this is a bug: we should markCoordinatorUnknown on error: `NOT_COORDINATOR`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14509 [WIP] [2/2] Implement server side logic for ConsumerGroupDescribe API [kafka]

2023-11-19 Thread via GitHub


riedelmax commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1398686956


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6465,12 +6465,42 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+val groupId = "group0"
+val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+consumerGroupDescribeRequestData.groupIds.add(groupId)
+val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+when(groupCoordinator.consumerGroupDescribe(
+  requestChannelRequest.context,
+  consumerGroupDescribeRequestData.groupIds
+//  any[RequestContext],
+//  any[util.List[String]]
+)).thenReturn(future)
+
+createKafkaApis(
+  overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+).handle(requestChannelRequest, RequestLocal.NoCaching)
+

Review Comment:
   This works. Thanks!



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
 assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+val groupId = "group0"
+val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+consumerGroupDescribeRequestData.groupIds.add(groupId)
+val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+when(groupCoordinator.consumerGroupDescribe(
+  any[RequestContext],
+  any[util.List[String]]
+)).thenReturn(future)

Review Comment:
   That works. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on PR #14632:
URL: https://github.com/apache/kafka/pull/14632#issuecomment-1818256808

   
   10 Failing flaky tests in current run, some have existing jira (mostly open):
   
   Build / JDK 11 and Scala 2.13 / 
testRackAwareRangeAssignor(String).quorum=kraft – 
integration.kafka.server.FetchFromFollowerIntegrationTest
   5s - https://issues.apache.org/jira/browse/KAFKA-15020
   
   Build / JDK 11 and Scala 2.13 / testFailureToFenceEpoch(String).quorum=kraft 
– org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   38s - https://issues.apache.org/jira/browse/KAFKA-14989
   
   Build / JDK 8 and Scala 2.12 / 
testDynamicProducerIdExpirationMs(String).quorum=kraft – 
kafka.api.ProducerIdExpirationTest
   32s
   Build / JDK 8 and Scala 2.12 / 
testThrottledProducerConsumer(String).quorum=zk – 
kafka.api.UserClientIdQuotaTest
   41s
   Build / JDK 8 and Scala 2.12 / 
testThrottledProducerConsumer(String).quorum=kraft – 
kafka.api.UserClientIdQuotaTest
   43s
   Build / JDK 8 and Scala 2.12 / testQuotaOverrideDelete(String).quorum=zk – 
kafka.api.UserClientIdQuotaTest
   1m 6s
   Build / JDK 8 and Scala 2.12 / testAssignmentAggregation() – 
kafka.server.AssignmentsManagerTest
   <1s
   Build / JDK 8 and Scala 2.12 / testAssignmentAggregation() – 
kafka.server.AssignmentsManagerTest
   2s
   Build / JDK 8 and Scala 2.12 / 
shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi – 
org.apache.kafka.streams.integration.StoreUpgradeIntegrationTest
   1m 7s - https://issues.apache.org/jira/browse/KAFKA-10151
   Build / JDK 8 and Scala 2.12 / [6] Type=Raft-Isolated, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV1, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   1m 7s - https://issues.apache.org/jira/browse/KAFKA-15104
   Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   2m 0s - https://issues.apache.org/jira/browse/KAFKA-15760
   Build / JDK 21 and Scala 2.13 / 
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions()
 – 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest
   1m 5s - https://issues.apache.org/jira/browse/KAFKA-14454
   Build / JDK 21 and Scala 2.13 / [1] Type=Raft-Combined, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV1, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   1m 7s - https://issues.apache.org/jira/browse/KAFKA-15104
   Build / JDK 21 and Scala 2.13 / [5] Type=Raft-Combined, 
Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.7-IV1, 
Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest
   https://issues.apache.org/jira/browse/KAFKA-15104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15038: Add metadatacache into RemoteLogManager, and refactor all relevant codes [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on PR #14136:
URL: https://github.com/apache/kafka/pull/14136#issuecomment-1818250640

   > RLM#onLeadershipChange, we did use metadataCache, so we should make sure 
metadataCache comes first before LISR. Had another look, we can still pass the 
topicIDs into RLM#onLeadershipChange to bypass it. So it looks fine.
   
   We are still using metadata-cache in RLM#onLeadershipChange, we have to 
ensure that the `MetadataCache` is updated before handling the 
LeaderAndIsrRequest (LISR). Otherwise, we can follow the suggestion posted by 
@showuon. Also, there are existing test failures related to remote storage, can 
you please take a look?
   
   
   ```
   Build / JDK 11 and Scala 2.13 / 
executeTieredStorageTest(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.ReassignReplicaShrinkTest
   Build / JDK 11 and Scala 2.13 / 
testSendOffsetsWithGroupMetadata(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   Build / JDK 11 and Scala 2.13 / testBasicTransactions(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   Build / JDK 11 and Scala 2.13 / 
testSendOffsetsWithGroupId(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   Build / JDK 11 and Scala 2.13 / 
testDelayedFetchIncludesAbortedTransaction(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   Build / JDK 21 and Scala 2.13 / 
testAbortTransactionTimeout(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15445: Add JVM Docker image [kafka]

2023-11-19 Thread via GitHub


VedarthConfluent commented on code in PR #14552:
URL: https://github.com/apache/kafka/pull/14552#discussion_r1398654526


##
docker/jvm/Dockerfile:
##
@@ -0,0 +1,97 @@
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###
+
+FROM golang:latest AS build-ub
+WORKDIR /build
+RUN useradd --no-log-init --create-home --shell /bin/bash appuser
+COPY --chown=appuser:appuser resources/ub/ ./
+RUN go build -ldflags="-w -s" ./ub.go
+USER appuser
+RUN go test ./...
+
+
+FROM eclipse-temurin:21-jre-alpine AS build-jsa
+
+USER root
+
+# Get kafka from https://archive.apache.org/dist/kafka and pass the url 
through build arguments
+ARG kafka_url
+
+COPY jsa_launch /etc/kafka/docker/jsa_launch
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache wget gcompat procps netcat-openbsd; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1;
+
+RUN /etc/kafka/docker/jsa_launch
+
+
+FROM eclipse-temurin:21-jre-alpine
+
+# exposed ports
+EXPOSE 9092
+
+USER root
+
+# Get kafka from https://archive.apache.org/dist/kafka and pass the url 
through build arguments
+ARG kafka_url
+ARG build_date
+
+
+LABEL org.label-schema.name="kafka" \
+  org.label-schema.description="Apache Kafka" \
+  org.label-schema.build-date="${build_date}" \
+  org.label-schema.vcs-url="https://github.com/apache/kafka; \
+  org.label-schema.schema-version="1.0" \
+  maintainer="apache"
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache curl wget gpg dirmngr gpg-agent gcompat; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
+wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
+gpg --import KEYS; \
+gpg --batch --verify kafka.tgz.asc kafka.tgz; \
+mkdir -p /var/lib/kafka/data /etc/kafka/secrets /var/log/kafka; \
+mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \
+adduser -h /home/appuser -D --shell /bin/bash appuser; \
+chown appuser:appuser -R /usr/logs /opt/kafka /mnt/shared/config; \
+chown appuser:root -R /var/lib/kafka /etc/kafka/secrets /var/lib/kafka 
/etc/kafka /var/log/kafka; \

Review Comment:
   Thanks for catching this. It has been fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14467:Fixed an issue where local incorrect snapshot files might occur due to first pulling the snapshot file and then truncate [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on code in PR #14652:
URL: https://github.com/apache/kafka/pull/14652#discussion_r1398649893


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -238,12 +238,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 
 log.debug("Updated the epoch cache from remote tier till 
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), 
partition);
 
-// Restore producer snapshot
-File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
-buildProducerSnapshotFile(snapshotFile, 
remoteLogSegmentMetadata, rlm);
-
 // Reload producer snapshots.
-
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+truncateFullyAndReloadRestoredSnapshots(unifiedLog, 
nextOffset, remoteLogSegmentMetadata, rlm);

Review Comment:
   @hudeqi 
   
   > In producerStateManager, first call truncateFullyAndStartAt to clean up 
the snapshot files, and then pull the snapshot file from RemoteLogManager. If 
the order of calls is reversed in original logic, it may cause the newly built 
snapshot file to be cleaned up again by truncateFullyAndStartAt.
   
   The `truncateFullyAndReloadSnapshots` only removes the snapshot files that 
was already loaded into the ProducerStateManager, so it doesn't remove the 
`snapshotFile` that was downloaded/built from the remote storage.
   
   AFAIR, we don't want to expose reloading the snapshots from 
ProducerStateManager without clearing it's internal states 
(`ProducerStateManager#reloadSnapshots` method). We can add a comment int this 
method to capture this information for future readers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14467:Fixed an issue where local incorrect snapshot files might occur due to first pulling the snapshot file and then truncate [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on code in PR #14652:
URL: https://github.com/apache/kafka/pull/14652#discussion_r1398649893


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -238,12 +238,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 
 log.debug("Updated the epoch cache from remote tier till 
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), 
partition);
 
-// Restore producer snapshot
-File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
-buildProducerSnapshotFile(snapshotFile, 
remoteLogSegmentMetadata, rlm);
-
 // Reload producer snapshots.
-
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+truncateFullyAndReloadRestoredSnapshots(unifiedLog, 
nextOffset, remoteLogSegmentMetadata, rlm);

Review Comment:
   @hudeqi 
   
   > In producerStateManager, first call truncateFullyAndStartAt to clean up 
the snapshot files, and then pull the snapshot file from RemoteLogManager. If 
the order of calls is reversed in original logic, it may cause the newly built 
snapshot file to be cleaned up again by truncateFullyAndStartAt.
   
   The `truncateFullyAndReloadSnapshots` only removes the snapshot files that 
was already loaded into the ProducerStateManager, so it doesn't remove the 
`snapshotFile` that was downloaded/built from the remote storage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14467:Fixed an issue where local incorrect snapshot files might occur due to first pulling the snapshot file and then truncate [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on code in PR #14652:
URL: https://github.com/apache/kafka/pull/14652#discussion_r1398646778


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -238,12 +238,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 
 log.debug("Updated the epoch cache from remote tier till 
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), 
partition);
 
-// Restore producer snapshot
-File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
-buildProducerSnapshotFile(snapshotFile, 
remoteLogSegmentMetadata, rlm);
-
 // Reload producer snapshots.
-
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+truncateFullyAndReloadRestoredSnapshots(unifiedLog, 
nextOffset, remoteLogSegmentMetadata, rlm);

Review Comment:
   Nice catch @hudeqi! This is the expected behaviour, not sure why this was 
changed while porting it to trunk:
   
   
https://sourcegraph.com/github.com/satishd/kafka@2.8.x-tiered-storage/-/blob/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?L422-437



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14467:Fixed an issue where local incorrect snapshot files might occur due to first pulling the snapshot file and then truncate [kafka]

2023-11-19 Thread via GitHub


kamalcph commented on code in PR #14652:
URL: https://github.com/apache/kafka/pull/14652#discussion_r1398646778


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -238,12 +238,8 @@ private Long buildRemoteLogAuxState(TopicPartition 
topicPartition,
 
 log.debug("Updated the epoch cache from remote tier till 
offset: {} with size: {} for {}", leaderLocalLogStartOffset, epochs.size(), 
partition);
 
-// Restore producer snapshot
-File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
-buildProducerSnapshotFile(snapshotFile, 
remoteLogSegmentMetadata, rlm);
-
 // Reload producer snapshots.
-
unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+truncateFullyAndReloadRestoredSnapshots(unifiedLog, 
nextOffset, remoteLogSegmentMetadata, rlm);

Review Comment:
   Nice catch @hudeqi! This is the expected behaviour, not sure why this was 
changed while porting it to trunk:
   
   
https://sourcegraph.com/github.com/satishd/kafka@2.8.x-tiered-storage/-/blob/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?L422-437



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15802) Trying to access uncopied segments metadata on listOffsets

2023-11-19 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787799#comment-17787799
 ] 

Satish Duggana commented on KAFKA-15802:


[~mimaison] Sorry for updating the JIRA. This can be resolved as fixed, I 
closed it now.

> Trying to access uncopied segments metadata on listOffsets
> --
>
> Key: KAFKA-15802
> URL: https://issues.apache.org/jira/browse/KAFKA-15802
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
> Fix For: 3.7.0, 3.6.1
>
>
> We have a tiered storage cluster running with Aiven s3 plugin. 
> On our cluster, we have a process doing regular listOffsets requests. 
> This triggers the following exception:
> {code:java}
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: 
> Requested remote resource was not found
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.lambda$createCacheEntry$6(RemoteIndexCache.java:355)
> at 
> org.apache.kafka.storage.internals.log.RemoteIndexCache.loadIndexFile(RemoteIndexCache.java:318)
> Nov 09, 2023 1:42:01 PM com.github.benmanes.caffeine.cache.LocalAsyncCache 
> lambda$handleCompletion$7
> WARNING: Exception thrown during asynchronous load
> java.util.concurrent.CompletionException: 
> io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:107)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
> at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: io.aiven.kafka.tieredstorage.storage.KeyNotFoundException: Key 
> cluster/topic-0A_3phS5QWu9eU28KG0Lxg/24/00149691-Rdf4cUR_S4OYAGImco6Lbg.rsm-manifest
>  does not exists in storage S3Storage{bucketName='bucket', partSize=16777216}
> at io.aiven.kafka.tieredstorage.storage.s3.S3Storage.fetch(S3Storage.java:80)
> at 
> io.aiven.kafka.tieredstorage.manifest.SegmentManifestProvider.lambda$new$1(SegmentManifestProvider.java:59)
> at 
> com.github.benmanes.caffeine.cache.CacheLoader.lambda$asyncLoad$0(CacheLoader.java:103)
> ... 7 more
> Caused by: software.amazon.awssdk.services.s3.model.NoSuchKeyException: The 
> specified key does not exist. (Service: S3, Status Code: 404, Request ID: 
> CFMP27PVC9V2NNEM, Extended Request ID: 
> F5qqlV06qQJ5qCuWl91oueBaha0QLMBURJudnOnFDQk+YbgFcAg70JBATcARDxN44DGo+PpfZHAsum+ioYMoOw==)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
> at 
> software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
> at 
> software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
> at 
> 

[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-19 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787790#comment-17787790
 ] 

Phuc Hong Tran commented on KAFKA-15341:


Hi [~ckamal], I have a question about controller->broker config propagation 
process that [~divijvaidya] mentioned. Do the LogManager validate broker config 
to see if it compatible with the new topic config before applying the new topic 
config? Thanks

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API [kafka]

2023-11-19 Thread via GitHub


dongnuo123 commented on code in PR #14800:
URL: https://github.com/apache/kafka/pull/14800#discussion_r1398581376


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1616,6 +1616,7 @@ public CoordinatorResult genericGroupJoin(
 responseFuture.complete(new JoinGroupResponseData()
 .setMemberId(memberId)
 .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+.setProtocolName(null)

Review Comment:
   Changed the default protocolName to comply with the old group coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15856: Add integration tests for JoinGroup API and SyncGroup API [kafka]

2023-11-19 Thread via GitHub


dongnuo123 opened a new pull request, #14800:
URL: https://github.com/apache/kafka/pull/14800

   This pr is based on https://github.com/apache/kafka/pull/14656,
   Adding integration tests for JoinGroup API and SyncGroup API.
   
   ### JIRA
   https://issues.apache.org/jira/browse/KAFKA-15856
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15856) Add integration tests for JoinGroup API and SyncGroup API

2023-11-19 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-15856:
---

Assignee: Dongnuo Lyu

> Add integration tests for JoinGroup API and SyncGroup API
> -
>
> Key: KAFKA-15856
> URL: https://issues.apache.org/jira/browse/KAFKA-15856
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15856) Add integration tests for JoinGroup API and SyncGroup API

2023-11-19 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-15856:
---

 Summary: Add integration tests for JoinGroup API and SyncGroup API
 Key: KAFKA-15856
 URL: https://issues.apache.org/jira/browse/KAFKA-15856
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398541169


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -231,6 +243,17 @@ private void handleCoordinatorDisconnect(Throwable 
exception, long currentTimeMs
 }
 }
 
+@Override
+public NetworkClientDelegate.PollResult pollOnClose() {
+if (!pendingRequests.hasUnsentRequests() || 
!coordinatorRequestManager.coordinator().isPresent())
+return EMPTY;
+
+sendAutoCommit(subscriptions.allConsumed());

Review Comment:
   We should take away the runAtClose one. Sorry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]

2023-11-19 Thread via GitHub


philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398540947


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
 void cleanup() {
 log.trace("Closing the consumer network thread");
 Timer timer = time.timer(closeTimeout);
+coordinatorOnClose(timer);
 runAtClose(requestManagers.entries(), networkClientDelegate, timer);
 closeQuietly(requestManagers, "request managers");
 closeQuietly(networkClientDelegate, "network client delegate");
 closeQuietly(applicationEventProcessor, "application event processor");
 log.debug("Closed the consumer network thread");
 }
+
+void coordinatorOnClose(final Timer timer) {
+if (!requestManagers.coordinatorRequestManager.isPresent())
+return;
+
+connectCoordinator(timer);

Review Comment:
   In case if the coordinator is disconnected, we need to first connect the 
coordinator in order to send the commits (and other tasks in the future).  The 
connectCoordinator() in the do { } while loop is to try to reconnect in case if 
the node is disconnected.  Similar the code in `ConsumerCoordinator` here:
   
   ```
   try {
   maybeAutoCommitOffsetsSync(timer);
   while (pendingAsyncCommits.get() > 0 && timer.notExpired()) {
   ensureCoordinatorReady(timer);
   client.poll(timer);
   invokeCompletedOffsetCommitCallbacks();
   }
   } finally {
   super.close(timer);
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-19 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398539065


##
streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslWindowParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
+ */
+public class DslWindowParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final Duration windowSize;
+private final boolean retainDuplicates;
+private final EmitStrategy emitStrategy;
+private final boolean isSlidingWindow;
+
+/**
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod  length of time to retain data in the store 
(cannot be negative)
+ * (note that the retention period must be at 
least long enough to contain the
+ * windowed data's entire life cycle, from 
window-start through window-end,
+ * and for the entire grace period)

Review Comment:
   Ah, that's ok then. I had assumed you wrote this and that it was just for 
this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]

2023-11-19 Thread via GitHub


ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1398539065


##
streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+
+/**
+ * {@code DslWindowParams} is a wrapper class for all parameters that function
+ * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}.
+ */
+public class DslWindowParams {
+
+private final String name;
+private final Duration retentionPeriod;
+private final Duration windowSize;
+private final boolean retainDuplicates;
+private final EmitStrategy emitStrategy;
+private final boolean isSlidingWindow;
+
+/**
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod  length of time to retain data in the store 
(cannot be negative)
+ * (note that the retention period must be at 
least long enough to contain the
+ * windowed data's entire life cycle, from 
window-start through window-end,
+ * and for the entire grace period)

Review Comment:
   Ah, that's ok then, I thought you wrote this just for this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-11-19 Thread via GitHub


ijuma merged PR #14661:
URL: https://github.com/apache/kafka/pull/14661


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-11-19 Thread via GitHub


ijuma commented on PR #14661:
URL: https://github.com/apache/kafka/pull/14661#issuecomment-1818023719

   Test failures are unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve printing topic name when created topic in TopicCommand [kafka]

2023-11-19 Thread via GitHub


ijuma commented on code in PR #14661:
URL: https://github.com/apache/kafka/pull/14661#discussion_r1398332721


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -257,7 +257,7 @@ static class CommandTopicPartition {
 
 public CommandTopicPartition(TopicCommandOptions options) {
 opts = options;
-name = options.topic();
+name = options.topic().get();

Review Comment:
   It's a bit unclear why it's safe to call `.get` here. This class could, in 
theory, be used outside of `create` and `alter` in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15816: Fix leaked sockets in streams tests [kafka]

2023-11-19 Thread via GitHub


mjsax commented on code in PR #14769:
URL: https://github.com/apache/kafka/pull/14769#discussion_r1398520372


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java:
##
@@ -418,6 +418,8 @@ public String metricsScope() {
 })
 );
 
+// do not use the harness streams

Review Comment:
   Not sure what this comment means?



##
streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java:
##
@@ -125,6 +126,8 @@ public void 
shouldBeAbleToProvideInitialMetricValueToMetricsReporter() {
 final Object initialMetricValue = 
METRIC_NAME_TO_INITIAL_VALUE.get(metricName.name());
 assertThat(initialMetricValue, notNullValue());
 });
+
+kafkaStreams.close(Duration.ofSeconds(30));

Review Comment:
   Should we use `try-with-resources` ? Do we need the timeout parameter (if 
yes, might still be better to use a try-catch with `finally` block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787681#comment-17787681
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15843:


Hey [~lianetm] I worked on the old ConsumerRebalanceListener a lot and can 
provide some context here. The reason #onPartitionsAssigned is still called on 
an empty set of partitions is largely historical, and the tl;dr is that it's 
probably ok to change this behavior in the new consumer if it won't impact the 
older one. 

For some background, in the old days of eager rebalancing (which is still the 
default protocol in the original consumer client), we would always invoke both 
#onPartitionsRevoked and #onPartitionsAssigned at the start and end of a 
rebalance, respectively. And since all partitions are revoked and re-assigned 
with eager rebalancing, there (usually) was a non-empty set of partitions 
passed into each of these.

Then came incremental cooperative rebalancing: we no longer revoked & 
reassigned all the partitions and instead acted only on the incremental change 
in partition assignment. So #onPartitionsRevoked only gets the subset of 
partitions that are being migrated to a different consumer, and 
#onPartitionsAssigned only gets newly-added partitions. Also, with the 
cooperative protocol, #onPartitionsRevoked would be invoked at the _end_ of a 
rebalance, rather than at the beginning.

However we still had to maintain compatibility across the two protocols for 
those implementing ConsumerRebalanceListener. And it was common to use the 
rebalance listener not just to listen in on the partition assignment, but to 
notify about the start and end of a rebalance. Therefore we decided to 
guarantee that #onPartitionsAssigned would still be invoked at the end of every 
rebalance, in case of users relying on this callback to detect the end of a 
rebalance. However, since #onPartitionsRevoked is no longer even invoked at the 
start of a cooperative rebalance, it can't be used to detect the start of one 
anymore and there was no reason to continue invoking it on every rebalance 
unless there were actually some partitions that were revoked. You'll notice 
that if the eager protocol is still enabled, the #onPartitionsRevoked callback  
actually is still invoked regardless of whether there's a non-empty set of 
partitions passed into it or not.

#onPartitionsLost is a bit of a special case, since (a) it was only added 
around the time cooperative rebalancing was implemented, as there was no old 
behavior for us to maintain compatibility with, and (b) it doesn't happen 
during a regular rebalance but instead only to notify the rebalance listener of 
a special case, ie that it has lost ownership of these partitions (but for that 
exact reason cannot commit offsets for them, as would normally occur in an 
#onPartitionsRevoked). If there aren't any lost partitions, there's no reason 
to invoke this callback (and it would be misleading to do so)

My understanding is that there is no "eager" or "cooperative" protocol in the 
new consumer, it's an entirely new protocol, so I would assume you're not 
obligated to maintain compatibility for existing ConsumerRebalanceListener 
implementations. In that case, it probably does not make sense to guarantee 
that #onPartitionsAssigned is invoked on every rebalance regardless, even if no 
new partitions are added. I'm not super familiar with the KIP-848 
implementation details, but I would assume that users can still use the 
ConsumerPartitionAssignor callbacks to effectively detect the start and end of 
a rebalance (via #subscriptionUserdata and #onAssignment)

Of course, if you intend to change the behavior in a way that would affect the 
old consumer as well, then you'll need to give Kafka Streams time to adopt a 
new approach since we currently still rely on #onPartitionsAssigned to notify 
us when a rebalance ends. I'm pretty sure we don't plan on using the new 
consumer right away though, since we'll need to make a number of changes like 
this one before we can do so.

> Review consumer onPartitionsAssigned called with empty partitions
> -
>
> Key: KAFKA-15843
> URL: https://issues.apache.org/jira/browse/KAFKA-15843
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Legacy coordinator triggers onPartitionsAssigned with empty assignment (which 
> is not the case when triggering onPartitionsRevoked or Lost). This is the 
> behaviour of the legacy coordinator, and the new consumer implementation 
> maintains the same principle. We should review this to fully understand if it 
> is really needed to call 

[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787679#comment-17787679
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:


Found the ticket: https://issues.apache.org/jira/browse/KAFKA-9398

And yes, it's still unresolved. 

Given all the above, I think we can honestly just disable/remove the test, as 
the named topologies feature was never made into a real public API. I do know 
of a few people who are using it anyway but they're aware it was only an 
experimental feature and not fully supported by Streams. So imo we don't need 
to go out of our way to fix any flaky tests: provided we can demonstrate that 
the issue is specific to named topologies and not potentially an issue with 
Streams itself. Of course in this case it's actually the latter, but we've 
recognized the root cause as a known issue, so I don't think there's anything 
more this test can do for us besides be flaky and annoy everyone.

Thanks for digging into this! 

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787678#comment-17787678
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:


I just checked the current code and it looks like we do still respect the 
guarantee of invoking #onPartitionsAssigned in all cases. So I don't think step 
#4 is correct. Did you happen to see anything in the logs that would suggest 
the StreamThread was continuing in its regular loop and never stopping due to 
the rebalanceInProgress flag? Or is it possible that it's hanging somewhere in 
the shutdown process (or even in the rebalance itself)?

I'm just wondering if it might be related to the Producer, not the Consumer. I 
know we had some issues with the Producer#close hanging in the past, and that 
it was related to users deleting topics from under the app, which would be a 
similar situation to what you found here. I'm not sure if we ever fixed that, 
maybe [~mjsax] will remember the ticket for the Producer issue?

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-19 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787677#comment-17787677
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15834:


Yeah great analysis, thanks [~gharris1727] 

I'm a bit confused by point #4, however – is this a change in behavior 
(possibly related to KIP-848)? It's my understanding that the 
#onPartitionsAssigned callback is guaranteed to always be invoked regardless of 
whether the set of partitions being newly assigned is non-empty or not. This is 
in contrast with the #onPartitionsRevoked and #onPartitionsLost callbacks, 
which are only invoked when the set of partitions to act upon is non-empty.

I think one could argue that this inconsistency is not ideal, but the behavior 
of always invoking #onPartitionsAssigned is a stated guarantee in the public 
contract of ConsumerRebalanceListener. See [this 
paragraph|https://github.com/apache/kafka/blob/254335d24ab6b6d13142dcdb53fec3856c16de9e/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L67]
 of the javadocs. In other words, I don't think we can change this without a 
KIP, and if this behavior was modified recently then we need to revert that 
change until a KIP is accepted.

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15853) Move KafkaConfig to server module

2023-11-19 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787674#comment-17787674
 ] 

Ismael Juma commented on KAFKA-15853:
-

Feel free to take this. KAFKA-15854 is a dependency and will be merged soon.

> Move KafkaConfig to server module
> -
>
> Key: KAFKA-15853
> URL: https://issues.apache.org/jira/browse/KAFKA-15853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
>
> The server module is a Java-only module, so this also requires converting 
> from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15853) Move KafkaConfig to server module

2023-11-19 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787672#comment-17787672
 ] 

Omnia Ibrahim commented on KAFKA-15853:
---

[~ijuma] any plans when will this be landing as KAFKA-14527 depends on it? if 
you are busy am happy to take this ticket.

> Move KafkaConfig to server module
> -
>
> Key: KAFKA-15853
> URL: https://issues.apache.org/jira/browse/KAFKA-15853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
>
> The server module is a Java-only module, so this also requires converting 
> from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15853) Move KafkaConfig to server module

2023-11-19 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-15853:
-

Assignee: (was: Omnia Ibrahim)

> Move KafkaConfig to server module
> -
>
> Key: KAFKA-15853
> URL: https://issues.apache.org/jira/browse/KAFKA-15853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
>
> The server module is a Java-only module, so this also requires converting 
> from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15853) Move KafkaConfig to server module

2023-11-19 Thread Omnia Ibrahim (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Omnia Ibrahim reassigned KAFKA-15853:
-

Assignee: Omnia Ibrahim

> Move KafkaConfig to server module
> -
>
> Key: KAFKA-15853
> URL: https://issues.apache.org/jira/browse/KAFKA-15853
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Omnia Ibrahim
>Priority: Major
>
> The server module is a Java-only module, so this also requires converting 
> from Scala to Java.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15445: Add JVM Docker image [kafka]

2023-11-19 Thread via GitHub


nkonev commented on code in PR #14552:
URL: https://github.com/apache/kafka/pull/14552#discussion_r1398494752


##
docker/jvm/Dockerfile:
##
@@ -0,0 +1,97 @@
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###
+
+FROM golang:latest AS build-ub
+WORKDIR /build
+RUN useradd --no-log-init --create-home --shell /bin/bash appuser
+COPY --chown=appuser:appuser resources/ub/ ./
+RUN go build -ldflags="-w -s" ./ub.go
+USER appuser
+RUN go test ./...
+
+
+FROM eclipse-temurin:21-jre-alpine AS build-jsa
+
+USER root
+
+# Get kafka from https://archive.apache.org/dist/kafka and pass the url 
through build arguments
+ARG kafka_url
+
+COPY jsa_launch /etc/kafka/docker/jsa_launch
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache wget gcompat procps netcat-openbsd; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1;
+
+RUN /etc/kafka/docker/jsa_launch
+
+
+FROM eclipse-temurin:21-jre-alpine
+
+# exposed ports
+EXPOSE 9092
+
+USER root
+
+# Get kafka from https://archive.apache.org/dist/kafka and pass the url 
through build arguments
+ARG kafka_url
+ARG build_date
+
+
+LABEL org.label-schema.name="kafka" \
+  org.label-schema.description="Apache Kafka" \
+  org.label-schema.build-date="${build_date}" \
+  org.label-schema.vcs-url="https://github.com/apache/kafka; \
+  org.label-schema.schema-version="1.0" \
+  maintainer="apache"
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache curl wget gpg dirmngr gpg-agent gcompat; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
+wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
+gpg --import KEYS; \
+gpg --batch --verify kafka.tgz.asc kafka.tgz; \
+mkdir -p /var/lib/kafka/data /etc/kafka/secrets /var/log/kafka; \
+mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \
+adduser -h /home/appuser -D --shell /bin/bash appuser; \
+chown appuser:appuser -R /usr/logs /opt/kafka /mnt/shared/config; \
+chown appuser:root -R /var/lib/kafka /etc/kafka/secrets /var/lib/kafka 
/etc/kafka /var/log/kafka; \

Review Comment:
   Duplicated `/var/lib/kafka`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]

2023-11-19 Thread via GitHub


lucasbru merged PR #14779:
URL: https://github.com/apache/kafka/pull/14779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15363: Broker log directory failure changes [kafka]

2023-11-19 Thread via GitHub


soarez commented on code in PR #14790:
URL: https://github.com/apache/kafka/pull/14790#discussion_r1398483277


##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -146,6 +153,9 @@ public void run() throws Exception {
 log.debug("Received new assignment {}", this);
 }
 pending.put(partition, this);
+if (callback != null) {
+callback.accept(DirectoryEventRequestState.QUEUED);
+}

Review Comment:
   Is this necessary? Can't we simply assume it's queued as soon as 
`onAssingment` returns?



##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -336,6 +356,27 @@ private static boolean responseIsError(ClientResponse 
response) {
 return false;
 }
 
+private static void applyCallbackOnComplete(
+AssignReplicasToDirsResponseData data,
+Map sent) {
+for (AssignReplicasToDirsResponseData.DirectoryData directory : 
data.directories()) {
+for (AssignReplicasToDirsResponseData.TopicData topic : 
directory.topics()) {
+for (AssignReplicasToDirsResponseData.PartitionData partition 
: topic.partitions()) {
+TopicIdPartition topicPartition = new 
TopicIdPartition(topic.topicId(), partition.partitionIndex());
+AssignmentEvent event = sent.get(topicPartition);
+if (event == null) {
+log.error("AssignReplicasToDirsResponse contains 
unexpected partition {} into directory {}. No callback to apply.", partition, 
directory.id());
+} else {
+Errors error = Errors.forCode(partition.errorCode());
+if (error == Errors.NONE && event.callback != null) {
+
event.callback.accept(DirectoryEventRequestState.COMPLETED);
+}

Review Comment:
   Instead of repeating the processing of the response for errors we can 
Set.diff the result of `filterFailures` against `inFlight` at the calling site 
for this function.



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:
##
@@ -76,13 +82,49 @@ class ReplicaAlterLogDirsThread(name: String,
 futureLog.updateHighWatermark(partitionData.highWatermark)
 futureLog.maybeIncrementLogStartOffset(partitionData.logStartOffset, 
LogStartOffsetIncrementReason.LeaderOffsetIncremented)
 
-if (partition.maybeReplaceCurrentWithFutureReplica())
-  removePartitions(Set(topicPartition))
+directoryEventHandler match {
+  case DirectoryEventHandler.NOOP =>
+if (partition.maybeReplaceCurrentWithFutureReplica())
+  removePartitions(Set(topicPartition))
+  case _ =>
+maybePromoteFutureReplica(topicPartition, partition)
+}
 
 quota.record(records.sizeInBytes)
 logAppendInfo
   }
 
+  // Visible for testing
+  def updatedAssignmentRequestStat(topicPartition: TopicPartition)(state: 
DirectoryEventRequestState): Unit = {
+assignmentRequestStates.put(topicPartition, state)
+  }
+  private def maybePromoteFutureReplica(topicPartition: TopicPartition, 
partition: Partition) = {
+val partitionRequestState = 
Option(assignmentRequestStates.get(topicPartition))
+val topicId = partition.topicId
+if (topicId.isEmpty)
+  throw new IllegalStateException(s"Topic ${topicPartition.topic()} exists 
but its ID doesn't exist.")
+
+partitionRequestState match {
+  case None =>
+// Schedule assignment request and don't promote the future replica 
yet until the controller accepted the request.
+partition.maybeFutureReplicaCaughtUp(_ => {
+  partition.futureReplicaDirectoryId()
+.map {
+  directoryEventHandler.handleAssignment(new 
TopicIdPartition(topicId.get, topicPartition.partition()), _,
+updatedAssignmentRequestStat(topicPartition)(_))

Review Comment:
   ```suggestion
   updatedAssignmentRequestState(topicPartition)(_))
   ```



##
core/src/main/java/kafka/server/AssignmentsManager.java:
##
@@ -210,6 +220,9 @@ public void run() throws Exception {
 channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
 buildRequestData(brokerId, brokerEpochSupplier.get(), 
assignment)),
 new AssignReplicasToDirsRequestCompletionHandler());
+inflight.values().stream()
+.filter(assignmentEvent -> assignmentEvent.callback != 
null)
+.forEach(assignmentEvent -> 
assignmentEvent.callback.accept(DirectoryEventRequestState.DISPATCHED));

Review Comment:
   I don't think we need the state in the callback. It makes sense to 
distinguish between the 3 states, but I don't see why we callback should fire 
for anything but `COMPLETED`. 



##
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala:

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on PR #14699:
URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817955171

   Thanks for the review @junrao. I have updated the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398480299


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -34,13 +70,348 @@ public class ClientMetricsManager implements Closeable {
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+
+// The last subscription updated time is used to determine if the next 
telemetry request needs
+// to re-evaluate the subscription id as per changes subscriptions.
+private long lastSubscriptionUpdateEpoch;
+
+// Visible for testing
+ClientMetricsManager() {
+subscriptionMap = new ConcurrentHashMap<>();
+clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+updateLastSubscriptionUpdateEpoch();
+}
+return;
+}
+
+ClientMetricsConfigs configs = new ClientMetricsConfigs(properties);
+updateClientSubscription(subscriptionName, configs);
+/*
+ Update last subscription updated time to current time to indicate 
that there is a change
+ in the subscription. This will be used to determine if the next 
telemetry request needs
+ to re-evaluate the subscription id as per changes subscriptions.
+*/
+updateLastSubscriptionUpdateEpoch();
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = System.currentTimeMillis();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issued get telemetry
+ request prior to push interval, then the client should get a throttle 
error but if the
+ subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+} finally {
+clientInstance.lastGetRequestEpoch(now);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || clientInstanceId == Uuid.ZERO_UUID) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = System.currentTimeMillis();
+ClientMetricsInstance clientInstance = 
getClientInstance(clientInstanceId, requestContext, now);
+
+try {
+// Validate the push request parameters for the client instance.
+validatePushRequest(request, telemetryMaxBytes, clientInstance);
+} catch (ApiException exception) {
+clientInstance.lastKnownError(Errors.forException(exception));

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398480068


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479956


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479915


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
 private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
 
 public static ClientMetricsManager instance() {
 return INSTANCE;
 }
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
+
+private ClientMetricsManager() {
+this(Time.SYSTEM);
+}
+
+// Visible for testing
+ClientMetricsManager(Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.time = time;
+}
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, 
RequestContext requestContext, int throttleMs) {
+
+long now = time.milliseconds();
+Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+.filter(id -> !id.equals(Uuid.ZERO_UUID))
+.orElse(generateNewClientId());
+
+/*
+ Get the client instance from the cache or create a new one. If 
subscription has changed
+ since the last request, then the client instance will be 
re-evaluated. Validation of the
+ request will be done after the client instance is created. If client 
issues another get
+ telemetry request prior to push interval, then the client should get 
a throttle error but if
+ the subscription has changed since the last request then the client 
should get the updated
+ subscription immediately.
+*/
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+try {
+// Validate the get request parameters for the client instance.
+validateGetRequest(request, clientInstance, now);
+} catch (ApiException exception) {
+return request.getErrorResponse(throttleMs, exception);
+}
+
+clientInstance.lastKnownError(Errors.NONE);
+return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
telemetryMaxBytes, throttleMs);
+}
+
+public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+int telemetryMaxBytes, RequestContext requestContext, int throttleMs) {
+
+Uuid clientInstanceId = request.data().clientInstanceId();
+if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+clientInstanceId);
+return request.getErrorResponse(throttleMs, new 
InvalidRequestException(msg));
+}
+
+long now = time.milliseconds();
+ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479399


##
core/src/main/java/kafka/metrics/ClientMetricsInstance.java:
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Contains the metrics instance metadata and the state of the client instance.
+ */
+public class ClientMetricsInstance {
+
+private final Uuid clientInstanceId;
+private final ClientMetricsInstanceMetadata instanceMetadata;
+private final int subscriptionId;
+private final int subscriptionVersion;
+private final Set metrics;
+private final int pushIntervalMs;
+
+private long lastGetRequestEpoch;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479369


##
core/src/test/java/kafka/server/ClientMetricsManagerTest.java:
##
@@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server;
+
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsTestUtils;
+import kafka.server.ClientMetricsManager.SubscriptionInfo;
+import kafka.utils.TestUtils;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryRequest.Builder;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsManagerTest {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ClientMetricsManagerTest.class);
+
+private Properties props;
+private KafkaConfig config;
+private MockTime time;
+private ClientMetricsManager clientMetricsManager;
+
+@BeforeEach
+public void setUp() {
+props = TestUtils.createDummyBrokerConfig();
+props.setProperty(KafkaConfig.ClientTelemetryMaxBytesProp(), "100");
+config = new KafkaConfig(props);
+time = new MockTime();
+clientMetricsManager = new ClientMetricsManager(config, time);
+}
+
+@Test
+public void testUpdateSubscription() {
+assertTrue(clientMetricsManager.subscriptions().isEmpty());
+
+assertEquals(0, clientMetricsManager.subscriptionUpdateVersion());
+clientMetricsManager.updateSubscription("sub-1", 
ClientMetricsTestUtils.defaultProperties());
+
+assertEquals(1, clientMetricsManager.subscriptions().size());
+assertNotNull(clientMetricsManager.subscriptionInfo("sub-1"));
+
+SubscriptionInfo subscriptionInfo = 
clientMetricsManager.subscriptionInfo("sub-1");
+Set metrics = subscriptionInfo.metrics();
+
+// Validate metrics.
+assertEquals(ClientMetricsTestUtils.DEFAULT_METRICS.split(",").length, 
metrics.size());
+
Arrays.stream(ClientMetricsTestUtils.DEFAULT_METRICS.split(",")).forEach(metric 
->
+assertTrue(metrics.contains(metric)));
+// Validate push interval.
+
assertEquals(ClientMetricsTestUtils.defaultProperties().getProperty(ClientMetricsConfigs.PUSH_INTERVAL_MS),
+String.valueOf(subscriptionInfo.intervalMs()));
+
+// Validate match patterns.
+
assertEquals(ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.size(),
+subscriptionInfo.matchPattern().size());
+ClientMetricsTestUtils.DEFAULT_CLIENT_MATCH_PATTERNS.forEach(pattern 
-> {
+String[] split = pattern.split("=");
+assertTrue(subscriptionInfo.matchPattern().containsKey(split[0]));
+assertEquals(split[1], 

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479353


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398479319


##
core/src/main/java/kafka/server/ClientMetricsManager.java:
##
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
 private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+private static final List SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+// Max cache size (16k active client connections per broker)
+private static final int CM_CACHE_MAX_SIZE = 16384;
+
+private final Cache clientInstanceCache;
+private final Map subscriptionMap;
+private final KafkaConfig config;
+private final Time time;
+
+// The latest subscription version is used to determine if subscription 
has changed and needs
+// to re-evaluate the client instance subscription id as per changed 
subscriptions.
+private final AtomicInteger subscriptionUpdateVersion;
 
-public static ClientMetricsManager instance() {
-return INSTANCE;
+public ClientMetricsManager(KafkaConfig config, Time time) {
+this.subscriptionMap = new ConcurrentHashMap<>();
+this.subscriptionUpdateVersion = new AtomicInteger(0);
+this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+this.config = config;
+this.time = time;
 }
 
 public void updateSubscription(String subscriptionName, Properties 
properties) {
-// TODO: Implement the update logic to manage subscriptions.
+// Validate the subscription properties.
+ClientMetricsConfigs.validate(subscriptionName, properties);
+// IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+// for respective subscription. In that case, we need to remove the 
subscription from the map.
+if (properties.isEmpty()) {
+// Remove the subscription from the map if it exists, else ignore 
the config update.
+if (subscriptionMap.containsKey(subscriptionName)) {
+log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+subscriptionMap.remove(subscriptionName);
+this.subscriptionUpdateVersion.incrementAndGet();
+}
+return;
+}
+
+updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+}
+
+public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+

Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1398477241


##
core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics;
+
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Plugin to register client telemetry receivers and export metrics. This 
class is used by the Kafka
+ * server to export client metrics to the registered receivers.
+ */
+public class ClientMetricsReceiverPlugin {
+
+private static final ClientMetricsReceiverPlugin INSTANCE = new 
ClientMetricsReceiverPlugin();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15855) RFC 9266: Channel Bindings for TLS 1.3 support | SCRAM-SHA-*-PLUS variants

2023-11-19 Thread Neustradamus (Jira)
Neustradamus created KAFKA-15855:


 Summary: RFC 9266: Channel Bindings for TLS 1.3 support | 
SCRAM-SHA-*-PLUS variants
 Key: KAFKA-15855
 URL: https://issues.apache.org/jira/browse/KAFKA-15855
 Project: Kafka
  Issue Type: Bug
  Components: connect, core, security
Reporter: Neustradamus


Dear Apache, and Kafka teams,

Can you add the support of RFC 9266: Channel Bindings for TLS 1.3?
- [https://datatracker.ietf.org/doc/html/rfc9266]

Little details, to know easily:
- tls-unique for TLS =< 1.2
- tls-server-end-point
- tls-exporter for TLS = 1.3

It is needed for SCRAM-SHA-*-PLUS variants.
Note: Some SCRAM-SHA are already supported.

I think that you have seen the jabber.ru MITM and Channel Binding is the 
solution:
- [https://notes.valdikss.org.ru/jabber.ru-mitm/]
- [https://snikket.org/blog/on-the-jabber-ru-mitm/]
- [https://www.devever.net/~hl/xmpp-incident]
- [https://blog.jmp.chat/b/certwatch]

IETF links:

SCRAM-SHA-1(-PLUS):
- RFC5802: Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms: [https://tools.ietf.org/html/rfc5802] // July 2010
- RFC6120: Extensible Messaging and Presence Protocol (XMPP): Core: 
[https://tools.ietf.org/html/rfc6120] // March 2011

SCRAM-SHA-256(-PLUS):
- RFC7677: SCRAM-SHA-256 and SCRAM-SHA-256-PLUS Simple Authentication and 
Security Layer (SASL) Mechanisms: [https://tools.ietf.org/html/rfc7677] // 
2015-11-02
- RFC8600: Using Extensible Messaging and Presence Protocol (XMPP) for Security 
Information Exchange: [https://tools.ietf.org/html/rfc8600] // 2019-06-21: 
[https://mailarchive.ietf.org/arch/msg/ietf-announce/suJMmeMhuAOmGn_PJYgX5Vm8lNA]

SCRAM-SHA-512(-PLUS):
- [https://tools.ietf.org/html/draft-melnikov-scram-sha-512]

SCRAM-SHA3-512(-PLUS):
- [https://tools.ietf.org/html/draft-melnikov-scram-sha3-512]

SCRAM BIS: Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms:
- [https://tools.ietf.org/html/draft-melnikov-scram-bis]

-PLUS variants:
- RFC5056: On the Use of Channel Bindings to Secure Channels: 
[https://tools.ietf.org/html/rfc5056] // November 2007
- RFC5929: Channel Bindings for TLS: [https://tools.ietf.org/html/rfc5929] // 
July 2010
- Channel-Binding Types: 
[https://www.iana.org/assignments/channel-binding-types/channel-binding-types.xhtml]
- RFC9266: Channel Bindings for TLS 1.3: [https://tools.ietf.org/html/rfc9266] 
// July 2022

IMAP:
- RFC9051: Internet Message Access Protocol (IMAP) - Version 4rev2: 
[https://tools.ietf.org/html/rfc9051] // August 2021

LDAP:
- RFC5803: Lightweight Directory Access Protocol (LDAP) Schema for Storing 
Salted: Challenge Response Authentication Mechanism (SCRAM) Secrets: 
[https://tools.ietf.org/html/rfc5803] // July 2010

HTTP:
- RFC7804: Salted Challenge Response HTTP Authentication Mechanism: 
[https://tools.ietf.org/html/rfc7804] // March 2016

JMAP:
- RFC8621: The JSON Meta Application Protocol (JMAP) for Mail: 
[https://tools.ietf.org/html/rfc8621] // August 2019

2FA:
- Extensions to Salted Challenge Response (SCRAM) for 2 factor authentication: 
[https://tools.ietf.org/html/draft-ietf-kitten-scram-2fa]

Thanks in advance.

Linked to:
- [https://github.com/scram-sasl/info/issues/1]

Note: This ticket can be for other Apache projects too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


divijvaidya commented on PR #14796:
URL: https://github.com/apache/kafka/pull/14796#issuecomment-1817924155

   Please feel free to merge after CI is successful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14819) Publish a single kafka (aka core) Maven artifact in Apache Kafka 4.0 (KIP-897)

2023-11-19 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-14819.
-
Fix Version/s: (was: 4.0.0)
   Resolution: Won't Fix

As described in the KIP, we're taking a different approach.

> Publish a single kafka (aka core) Maven artifact in Apache Kafka 4.0 (KIP-897)
> --
>
> Key: KAFKA-14819
> URL: https://issues.apache.org/jira/browse/KAFKA-14819
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Blocker
>
> Please see KIP for details:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-897%3A+Publish+a+single+kafka+%28aka+core%29+Maven+artifact+in+Apache+Kafka+4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398448178


##
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java:
##
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.metrics;
+package org.apache.kafka.server.metrics;

Review Comment:
   Can you please share the PRs where this is being discussed now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


ijuma commented on PR #14796:
URL: https://github.com/apache/kafka/pull/14796#issuecomment-1817912100

   @divijvaidya Thanks for the review. I added a basic `package-info` file. We 
can flesh it out as we add move code to this module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-15801) improve Kafka broker/NetworkClient logging for connectivity issues with hostname and port info and higher severity

2023-11-19 Thread johndoe (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15801 ]


johndoe deleted comment on KAFKA-15801:
-

was (Author: johndoe):
[KAFKA-15801|https://github.com/apache/kafka/pull/14799]

> improve Kafka broker/NetworkClient logging for connectivity issues with 
> hostname and port info and higher severity
> --
>
> Key: KAFKA-15801
> URL: https://issues.apache.org/jira/browse/KAFKA-15801
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 3.6.0
>Reporter: Alexander Kilian
>Assignee: johndoe
>Priority: Trivial
>
> When a component of the Kafka broker tries to reach another broker within the 
> cluster the logging should be more elaborate and include the IP/hostname and 
> port it tries to connect to, and have a higher severity WARN rather than INFO.
> Current logging:
> {{[2023-11-09 07:33:36,106] INFO [TransactionCoordinator id=1] Disconnecting 
> from node 1 due to socket connection setup timeout. The timeout value is 
> 26590 ms. (org.apache.kafka.clients.NetworkClient)}}
> Suggested logging:
> {{[2023-11-09 07:33:36,106] WARN [TransactionCoordinator id=1] Disconnecting 
> from node 1 on kafka-headless.m2-mgex.svc.cluster.local:9093 due to socket 
> connection setup timeout. The timeout value is 26590 ms. 
> (org.apache.kafka.clients.NetworkClient)}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15801) improve Kafka broker/NetworkClient logging for connectivity issues with hostname and port info and higher severity

2023-11-19 Thread johndoe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787617#comment-17787617
 ] 

johndoe commented on KAFKA-15801:
-

[KAFKA-15801|https://github.com/apache/kafka/pull/14799]

> improve Kafka broker/NetworkClient logging for connectivity issues with 
> hostname and port info and higher severity
> --
>
> Key: KAFKA-15801
> URL: https://issues.apache.org/jira/browse/KAFKA-15801
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 3.6.0
>Reporter: Alexander Kilian
>Assignee: johndoe
>Priority: Trivial
>
> When a component of the Kafka broker tries to reach another broker within the 
> cluster the logging should be more elaborate and include the IP/hostname and 
> port it tries to connect to, and have a higher severity WARN rather than INFO.
> Current logging:
> {{[2023-11-09 07:33:36,106] INFO [TransactionCoordinator id=1] Disconnecting 
> from node 1 due to socket connection setup timeout. The timeout value is 
> 26590 ms. (org.apache.kafka.clients.NetworkClient)}}
> Suggested logging:
> {{[2023-11-09 07:33:36,106] WARN [TransactionCoordinator id=1] Disconnecting 
> from node 1 on kafka-headless.m2-mgex.svc.cluster.local:9093 due to socket 
> connection setup timeout. The timeout value is 26590 ms. 
> (org.apache.kafka.clients.NetworkClient)}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15801: improve Kafka broker/NetworkClient logging for connectiv… [kafka]

2023-11-19 Thread via GitHub


Joker-5 opened a new pull request, #14799:
URL: https://github.com/apache/kafka/pull/14799

   When a component of the Kafka broker tries to reach another broker within 
the cluster the logging should be more elaborate and include the IP/hostname 
and port it tries to connect to, and have a higher severity WARN rather than 
INFO.
   
   Current logging:
   
   [2023-11-09 07:33:36,106] INFO [TransactionCoordinator id=1] Disconnecting 
from node 1 due to socket connection setup timeout. The timeout value is 26590 
ms. (org.apache.kafka.clients.NetworkClient)
   
   Suggested logging:
   
   [2023-11-09 07:33:36,106] WARN [TransactionCoordinator id=1] Disconnecting 
from node 1 on kafka-headless.m2-mgex.svc.cluster.local:9093 due to socket 
connection setup timeout. The timeout value is 26590 ms. 
(org.apache.kafka.clients.NetworkClient)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517:Implement regex subscriptions [kafka]

2023-11-19 Thread via GitHub


JimmyWang6 commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-1817894315

   @vamossagar12 Much thanks for your valuable comments! I apologize for the 
delay in working on this issue for the reason that I have been occupied with 
other commitments. I will fix the problems above as soon as possible. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


ijuma commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398418681


##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {
+def receiptFile = file("$buildDir/kafka/$buildVersionFileName")

Review Comment:
   Fyi :
   
   "Note that, at this stage, Gradle will not print deprecation warnings if you 
still use Project.buildDir. We know this is a big change and want to give time 
for authors of major plugins to move away from its usage first."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-19 Thread via GitHub


ijuma commented on code in PR #14794:
URL: https://github.com/apache/kafka/pull/14794#discussion_r1398417831


##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -72,6 +75,29 @@ void tearDown() throws InterruptedException {
 manager.close();
 }
 
+AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {

Review Comment:
   This approach results in more maintenance, any change to this schema will 
require changing this method. I don't agree it's better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


apoorvmittal10 commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398408817


##
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java:
##
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package kafka.metrics;
+package org.apache.kafka.server.metrics;

Review Comment:
   Thanks @ijuma. @junrao I ll move the classes in other PRs to server/src 
package itself once review is complete. Moving prior active review will lose 
context of comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15801) improve Kafka broker/NetworkClient logging for connectivity issues with hostname and port info and higher severity

2023-11-19 Thread johndoe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

johndoe reassigned KAFKA-15801:
---

Assignee: johndoe

> improve Kafka broker/NetworkClient logging for connectivity issues with 
> hostname and port info and higher severity
> --
>
> Key: KAFKA-15801
> URL: https://issues.apache.org/jira/browse/KAFKA-15801
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 3.6.0
>Reporter: Alexander Kilian
>Assignee: johndoe
>Priority: Trivial
>
> When a component of the Kafka broker tries to reach another broker within the 
> cluster the logging should be more elaborate and include the IP/hostname and 
> port it tries to connect to, and have a higher severity WARN rather than INFO.
> Current logging:
> {{[2023-11-09 07:33:36,106] INFO [TransactionCoordinator id=1] Disconnecting 
> from node 1 due to socket connection setup timeout. The timeout value is 
> 26590 ms. (org.apache.kafka.clients.NetworkClient)}}
> Suggested logging:
> {{[2023-11-09 07:33:36,106] WARN [TransactionCoordinator id=1] Disconnecting 
> from node 1 on kafka-headless.m2-mgex.svc.cluster.local:9093 due to socket 
> connection setup timeout. The timeout value is 26590 ms. 
> (org.apache.kafka.clients.NetworkClient)}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15341) Enabling TS for a topic during rolling restart causes problems

2023-11-19 Thread Phuc Hong Tran (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787595#comment-17787595
 ] 

Phuc Hong Tran commented on KAFKA-15341:


[~showuon] [~satish.duggana], just to clarify, the situation that we are 
discussing it about when we need to enable TS system wide, but not all brokers 
have TS enable on them. This could mean that those brokers don't have the 
"{_}remote.log.storage.system.enable"{_} set to true, or they haven't undergone 
restart yet, meaning their metadata version is less then what is required for 
TS. That mean we need to have the MetadataVersion.java updated to check if a 
metadata version support TS or not, and also send some data about whether a 
broker has "{_}remote.log.storage.system.enable"{_} set to true or not, since 
the default value of this property is false. That also mean we will need to 
update broker's metadata format, specifically broker registration record to 
have the enable TS status of a broker. Do you guys think we need a KIP for this 
since we're changing the broker's metadata format?

> Enabling TS for a topic during rolling restart causes problems
> --
>
> Key: KAFKA-15341
> URL: https://issues.apache.org/jira/browse/KAFKA-15341
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: KIP-405
> Fix For: 3.7.0
>
>
> When we are in a rolling restart to enable TS at system level, some brokers 
> have TS enabled on them and some don't. We send an alter config call to 
> enable TS for a topic, it hits a broker which has TS enabled, this broker 
> forwards it to the controller and controller will send the config update to 
> all brokers. When another broker which doesn't have TS enabled (because it 
> hasn't undergone the restart yet) gets this config change, it "should" fail 
> to apply it. But failing now is too late since alterConfig has already 
> succeeded since controller->broker config propagation is done async.
> With this JIRA, we want to have controller check if TS is enabled on all 
> brokers before applying alter config to turn on TS for a topic.
> Context: https://github.com/apache/kafka/pull/14176#discussion_r1291265129



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-19 Thread via GitHub


divijvaidya commented on code in PR #14794:
URL: https://github.com/apache/kafka/pull/14794#discussion_r1398370287


##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -72,6 +75,29 @@ void tearDown() throws InterruptedException {
 manager.close();
 }
 
+AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {

Review Comment:
   Thanks for the response @soarez! 
   
   I agree with not modifying the objects for assertion. Copying should be ok 
for tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]

2023-11-19 Thread via GitHub


soarez commented on code in PR #14794:
URL: https://github.com/apache/kafka/pull/14794#discussion_r1398369411


##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -72,6 +75,29 @@ void tearDown() throws InterruptedException {
 manager.close();
 }
 
+AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {

Review Comment:
   @divijvaidya that sounds like a good idea to me. This list sorting aspect 
probably applies to other schemas too. Maybe we could extend the message 
generator framework to allow specifying whether the order matters in 
`FieldType.ArrayType`. At the moment I'm focussed on making as much progress on 
JBOD as possible for 3.7, but I'm happy to come back and tackle this later if 
no else picks it up in the meantime.
   
   @ijuma because this is a test, so reasons not to copy don't really take 
place such as extra time, extra memory footprint; but the reasons to copy do: 
minimizes side effects from the caller perspective, especially as this method 
is exclusively used as a utility to `assertRequestEquals` — it would be 
surprising to find that a method that verifies equality is also modifying its 
arguments.



##
core/src/test/java/kafka/server/AssignmentsManagerTest.java:
##
@@ -72,6 +75,29 @@ void tearDown() throws InterruptedException {
 manager.close();
 }
 
+AssignReplicasToDirsRequestData normalize(AssignReplicasToDirsRequestData 
request) {

Review Comment:
   @divijvaidya that sounds like a good idea to me. This list sorting aspect 
probably applies to other schemas too. Maybe we could extend the message 
generator framework to allow specifying whether the order matters in 
`FieldType.ArrayType`. At the moment I'm focussed on making as much progress on 
JBOD as possible for 3.7, but I'm happy to come back and tackle this later if 
no else picks it up in the meantime.
   
   @ijuma because this is a test, so reasons not to copy don't really take 
place such as extra time, extra memory footprint; but the reasons to copy do: 
minimizes side effects from the caller perspective, especially as this method 
is exclusively used as a utility to `assertRequestEquals` — it would be 
surprising to find that a method that verifies equality is also modifying its 
arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15801) improve Kafka broker/NetworkClient logging for connectivity issues with hostname and port info and higher severity

2023-11-19 Thread johndoe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787578#comment-17787578
 ] 

johndoe commented on KAFKA-15801:
-

Hi, [~alexanderkilian].Are you still following this issue? If you don't have 
time, I can take over. Thanks.

> improve Kafka broker/NetworkClient logging for connectivity issues with 
> hostname and port info and higher severity
> --
>
> Key: KAFKA-15801
> URL: https://issues.apache.org/jira/browse/KAFKA-15801
> Project: Kafka
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 3.6.0
>Reporter: Alexander Kilian
>Priority: Trivial
>
> When a component of the Kafka broker tries to reach another broker within the 
> cluster the logging should be more elaborate and include the IP/hostname and 
> port it tries to connect to, and have a higher severity WARN rather than INFO.
> Current logging:
> {{[2023-11-09 07:33:36,106] INFO [TransactionCoordinator id=1] Disconnecting 
> from node 1 due to socket connection setup timeout. The timeout value is 
> 26590 ms. (org.apache.kafka.clients.NetworkClient)}}
> Suggested logging:
> {{[2023-11-09 07:33:36,106] WARN [TransactionCoordinator id=1] Disconnecting 
> from node 1 on kafka-headless.m2-mgex.svc.cluster.local:9093 due to socket 
> connection setup timeout. The timeout value is 26590 ms. 
> (org.apache.kafka.clients.NetworkClient)}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


divijvaidya commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398361611


##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15854: Move Java classes from `kafka.server` to the `server` module [kafka]

2023-11-19 Thread via GitHub


divijvaidya commented on code in PR #14796:
URL: https://github.com/apache/kafka/pull/14796#discussion_r1398361570


##
build.gradle:
##
@@ -842,6 +842,62 @@ tasks.create(name: "jarConnect", dependsOn: 
connectPkgs.collect { it + ":jar" })
 
 tasks.create(name: "testConnect", dependsOn: connectPkgs.collect { it + 
":test" }) {}
 
+project(':server') {
+  archivesBaseName = "kafka-server"
+
+  dependencies {
+implementation project(':clients')
+implementation project(':server-common')
+
+implementation libs.slf4jApi
+
+compileOnly libs.log4j
+
+testImplementation project(':clients').sourceSets.test.output
+
+testImplementation libs.mockitoCore
+testImplementation libs.junitJupiter
+testImplementation libs.slf4jlog4j
+  }
+
+  task createVersionFile() {
+def receiptFile = file("$buildDir/kafka/$buildVersionFileName")

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org