[GitHub] [kafka] satishd commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


satishd commented on PR #13046:
URL: https://github.com/apache/kafka/pull/13046#issuecomment-1420265571

These test failures seem to be unrelated. I rebased the PR against trunk. I 
will wait for the test results.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-06 Thread via GitHub


rittikaadhikari commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1097985124


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only 
the start is needed.

Review Comment:
   nit: Currently, the tier state machine follows a synchronous execution, and 
we only need to start the machine.



##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
 case Errors.OFFSET_OUT_OF_RANGE =>
   if (!handleOutOfRangeError(topicPartition, 
currentFetchState, fetchPartitionData.currentLeaderEpoch))
 partitionsWithError += topicPartition
-case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-  debug(s"Received error 
${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-s"at fetch offset: ${currentFetchState.fetchOffset}, " + 
s"topic-partition: $topicPartition")
-  if (!handleOffsetsMovedToTieredStorage(topicPartition, 
currentFetchState,
-fetchPartitionData.currentLeaderEpoch, 
partitionData.logStartOffset()))
-partitionsWithError += topicPartition
+

Review Comment:
   nit: extra line



##
core/src/main/java/kafka/server/ReplicaAlterLogDirsTierStateMachine.java:
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+import java.util.Optional;
+
+public class ReplicaAlterLogDirsTierStateMachine implements 

[GitHub] [kafka] yashmayya commented on pull request #13120: MINOR: Connect Javadocs improvements

2023-02-06 Thread via GitHub


yashmayya commented on PR #13120:
URL: https://github.com/apache/kafka/pull/13120#issuecomment-1420163844

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-06 Thread via GitHub


hgeraldino commented on PR #13191:
URL: https://github.com/apache/kafka/pull/13191#issuecomment-1420146327

   > Thanks @hgeraldino!
   > 
   > I'm a little confused by the amount of churn here--it seems like a lot of 
utility methods related to topic creation, record transformation, etc. have 
been removed and their contents inlined directly into test cases. If this isn't 
necessary for the migration, can we try to retain that approach in order to 
reduce duplication?
   > 
   > It's also worth noting that there are unused stubbings in some of these 
tests, which should be failing the build but are not at the moment due to 
[KAFKA-14682](https://issues.apache.org/jira/browse/KAFKA-14682). You can find 
these unused stubbings by running `./gradlew :connect:runtime:test --tests 
AbstractWorkerSourceTaskTest` in your command line, or possibly by running the 
entire `AbstractWorkerSourceTaskTest` test suite in IntelliJ (which is how I 
discovered them). These unused stubbings should be removed before we merge the 
PR.
   
   That's fair. Personally I prefer self-contained test methods hat can be read 
top-bottom (without having to jump around), even if it violates the DRY 
principle. But I'm ok on keeping the existing structure & style, so I put back 
the (revised) helper methods


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-06 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1098136639


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,93 +814,25 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(transformationChain.apply(any(SourceRecord.class)))
+.thenAnswer((Answer) invocation -> 
invocation.getArgument(0));
+when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+anyString()))
+.thenAnswer((Answer) invocation -> {
+String headerValue = invocation.getArgument(3, String.class);
+return headerValue.getBytes(StandardCharsets.UTF_8);
+});
+when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+.thenReturn(SERIALIZED_KEY);
+when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(RECORD_SCHEMA),
+eq(RECORD)))
+.thenReturn(SERIALIZED_RECORD);
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
-}
-
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
-if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
-
-expectApplyTransformationChain(anyTimes);
-
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
-
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-}
-producerCallbacks.reset();
-}
-return null;
-};
-
-if (anyTimes)
-expect.andStubAnswer(expectResponse);
-else
-expect.andAnswer(expectResponse);
-
-expectTaskGetTopic(anyTimes);
-
-return sent;
-}
-
-private Capture> expectSendRecordAnyTimes() 
{
-return expectSendRecord(TOPIC, true, emptyHeaders());
-}
-
-private Capture> expectSendRecord() {
-return expectSendRecord(TOPIC, false, emptyHeaders());
-}
-
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-} else {
-expect.andAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-}
-if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-assertEquals("job", connectorCapture.getValue());
-assertEquals(TOPIC, topicCapture.getValue());
-}
-}
-
-private void expectTopicCreation(String topic) {
-if (config.topicCreationEnable()) {
-
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-}

Review Comment:
   Fair enough, I restored some of these methods



-- 
This is an automated message from the Apache Git 

[GitHub] [kafka] hgeraldino commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-06 Thread via GitHub


hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1098135416


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest {
 @Mock private ConnectorOffsetBackingStore offsetStore;
 @Mock private StatusBackingStore statusBackingStore;
 @Mock private WorkerSourceTaskContext sourceTaskContext;
-@MockStrict private TaskStatus.Listener statusListener;

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #13209: MINOR: remove unnecessary helper method

2023-02-06 Thread via GitHub


mjsax opened a new pull request, #13209:
URL: https://github.com/apache/kafka/pull/13209

   The helper method in question was updated via KIP-622 instead of removed. Cf 
https://github.com/apache/kafka/commit/029f5a136ae2c74f7f93e716bcc30ce90d8241ad


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-06 Thread via GitHub


gharris1727 commented on PR #13208:
URL: https://github.com/apache/kafka/pull/13208#issuecomment-1420026975

   I verified that this fix corrects the flakey failures I was seeing in 
ConnectDistributedTest.test_bounce by manually running the tests with a flush 
interval of 1ms.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-06 Thread via GitHub


jolshan commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1098052600


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED

Review Comment:
   +1 to this idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2023-02-06 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684977#comment-17684977
 ] 

Greg Harris commented on KAFKA-5756:


[~mimaison] I have opened a PR that I think may alleviate this failure mode.

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-5756) Synchronization issue on flush

2023-02-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-5756:
--

Assignee: Greg Harris

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Assignee: Greg Harris
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 opened a new pull request, #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-06 Thread via GitHub


gharris1727 opened a new pull request, #13208:
URL: https://github.com/apache/kafka/pull/13208

   Both the SourceTaskOffsetCommitter and WorkerSourceTask trigger offset 
commits. Currently, when both threads attempt to start concurrent flushes, the 
second to call beginFlush receives an exception. The SourceTaskOffsetCommitter 
swallows this exception, while the WorkerSourceTask propagates the exception, 
preventing the final offset from completing cleanly and dropping final offsets.
   
   This change allows the second caller of waitForBeginFlush to wait for the 
first flush operation to complete, avoiding exceptions if offset flushes are 
prompt.
   
   Signed-off-by: Greg Harris 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14686) MockAdminClient.createTopics does not provide TopicMetadataAndConfig

2023-02-06 Thread Kirk True (Jira)
Kirk True created KAFKA-14686:
-

 Summary: MockAdminClient.createTopics does not provide 
TopicMetadataAndConfig
 Key: KAFKA-14686
 URL: https://issues.apache.org/jira/browse/KAFKA-14686
 Project: Kafka
  Issue Type: Bug
  Components: admin, clients, unit tests
Reporter: Kirk True
Assignee: Kirk True


[Line 386 of 
{{MockAdminClient}}|https://github.com/apache/kafka/blame/trunk/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java#L386]
 does this:
{quote}{{future.complete(null);}}
{quote}
It seems like we should be creating a {{TopicMetadataAndConfig}} instance and 
passing that in instead so that its available to the unit test caller.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-06 Thread via GitHub


hachikuji commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1098020205


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3509,6 +3510,13 @@ class KafkaApis(val requestChannel: RequestChannel,
   )
   }
 
+  def handleConsumerGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val consumerGroupHeartbeatRequest = 
request.body[ConsumerGroupHeartbeatRequest]
+// KIP-848 is not implemented yet so return UNSUPPORTED_VERSION.
+requestHelper.sendMaybeThrottle(request, 
consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))

Review Comment:
   Would it be overkill to have a unit test for this in `KafkaApisTest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-02-06 Thread via GitHub


hachikuji commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1098014628


##
clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java:
##
@@ -212,6 +215,36 @@ public boolean isVersionSupported(short apiVersion) {
 return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
 }
 
+public boolean isVersionEnabled(short apiVersion, boolean 
enableUnstableLastVersion) {
+// ApiVersions API is a particular case that we always accept any, even

Review Comment:
   It would be helpful to clarify the reason for this.



##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -45,55 +45,70 @@ object ApiVersionManager {
   listenerType,
   forwardingManager,
   supportedFeatures,
-  metadataCache
+  metadataCache,
+  config.unstableApiVersionsEnabled
 )
   }
 }
 
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
   val enabledApis: collection.Set[ApiKeys],
-  brokerFeatures: Features[SupportedVersionRange]
+  brokerFeatures: Features[SupportedVersionRange],
+  enableUnstableLastVersion: Boolean
 ) extends ApiVersionManager {
 
-  def this(listenerType: ListenerType) = {
-this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
BrokerFeatures.defaultSupportedFeatures())
+  def this(
+listenerType: ListenerType,
+enableUnstableLastVersion: Boolean
+  ) = {
+this(
+  listenerType,
+  ApiKeys.apisForListener(listenerType).asScala,
+  BrokerFeatures.defaultSupportedFeatures(),
+  enableUnstableLastVersion
+)
   }
 
-  private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
+  private val apiVersions = 
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse 
= {
 ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
apiVersions, brokerFeatures)
   }
+
+  override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
+apiKey != null && apiKey.inScope(listenerType) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
+  }
 }
 
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
   forwardingManager: Option[ForwardingManager],
   features: BrokerFeatures,
-  metadataCache: MetadataCache
+  metadataCache: MetadataCache,
+  enableUnstableLastVersion: Boolean
 ) extends ApiVersionManager {
 
+  val enabledApis = ApiKeys.apisForListener(listenerType).asScala
+
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
 val supportedFeatures = features.supportedFeatures
 val finalizedFeatures = metadataCache.features()
 val controllerApiVersions = 
forwardingManager.flatMap(_.controllerApiVersions)
 
 ApiVersionsResponse.createApiVersionsResponse(
-throttleTimeMs,
-metadataCache.metadataVersion().highestSupportedRecordVersion,
-supportedFeatures,
-finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
-finalizedFeatures.epoch,
-controllerApiVersions.orNull,
-listenerType)
-  }
-
-  override def enabledApis: collection.Set[ApiKeys] = {
-ApiKeys.apisForListener(listenerType).asScala
+  throttleTimeMs,
+  metadataCache.metadataVersion().highestSupportedRecordVersion,
+  supportedFeatures,
+  finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
+  finalizedFeatures.epoch,
+  controllerApiVersions.orNull,
+  listenerType,
+  enableUnstableLastVersion
+)
   }
 
-  override def isApiEnabled(apiKey: ApiKeys): Boolean = {
-apiKey.inScope(listenerType)
+  override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
+apiKey != null && apiKey.inScope(listenerType) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)

Review Comment:
   Could we pull this implementation up to the `trait`? The implementation 
looks the same for `SimpleApiVersionManager`.



##
generator/src/main/java/org/apache/kafka/message/RequestApiStabilityType.java:
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific 

[jira] [Commented] (KAFKA-14685) TierStateMachine interface for building remote aux log

2023-02-06 Thread Matthew Wong (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684968#comment-17684968
 ] 

Matthew Wong commented on KAFKA-14685:
--

https://github.com/apache/kafka/pull/13206

> TierStateMachine interface for building remote aux log
> --
>
> Key: KAFKA-14685
> URL: https://issues.apache.org/jira/browse/KAFKA-14685
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Matthew Wong
>Priority: Major
>
> To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can 
> introduce an interface to manage state transitions of building the remote aux 
> log asynchronously



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14685) TierStateMachine interface for building remote aux log

2023-02-06 Thread Matthew Wong (Jira)
Matthew Wong created KAFKA-14685:


 Summary: TierStateMachine interface for building remote aux log
 Key: KAFKA-14685
 URL: https://issues.apache.org/jira/browse/KAFKA-14685
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Matthew Wong


To help with https://issues.apache.org/jira/browse/KAFKA-13560 , we can 
introduce an interface to manage state transitions of building the remote aux 
log asynchronously



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #13207: KAFKA-14664; Fix inaccurate raft idle ratio metric

2023-02-06 Thread via GitHub


hachikuji opened a new pull request, #13207:
URL: https://github.com/apache/kafka/pull/13207

   The raft idle ratio is currently computed as the average of all recorded 
poll durations. This tends to underestimate the actual idle ratio since it 
treats all measurements equally regardless how much time was spent. For 
example, say we poll twice with the following durations:
   
   Poll 1: 2s
   Poll 2: 0s
   
   Assume that the busy time is negligible, so 2s passes overall.
   
   In the first measurement, 2s is spent waiting, so we compute and record a 
ratio of 1.0. In the second measurement, no time passes, and we record 0.0. The 
idle ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 
0.5), which suggests that the process was busy for 1s, which overestimates the 
true busy time.
   
   In this patch, I've created a new `TimeRatio` class which tracks the total 
duration of a periodic event over a full interval of time measurement.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #13206: Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-06 Thread via GitHub


junrao commented on PR #13206:
URL: https://github.com/apache/kafka/pull/13206#issuecomment-1419883513

   cc @satishd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 opened a new pull request, #13206: Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-06 Thread via GitHub


mattwong949 opened a new pull request, #13206:
URL: https://github.com/apache/kafka/pull/13206

   This PR adds the TierStateMachine interface to handle all state transitions 
related to tiered storage and building the remote log aux state.
   
   The new interface supports a `start` and `maybeAdvanceState`. In the 
`ReplicaFetcherTierStateMachine`, the `maybeAdvanceState` is unused since the 
implementation is synchronous. Only the `start` is needed. This PR keeps the 
addition of the `maybeAdvanceState` because there is an existing task for 
building the remote log aux state in an asynchronous manner that will be able 
to use the full interface https://issues.apache.org/jira/browse/KAFKA-13560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684958#comment-17684958
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

Correct. 3.4.0 is already voted and should be released soon. I plan to 
cherry-pick for 3.4.1 after 3.4.0 is out.

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-06 Thread via GitHub


ijuma commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1419776869

   @dejan2609 We'll wait for the final release, but it's fine to start working 
through the issues. Looks like the build failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-02-06 Thread via GitHub


mjsax commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097933056


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +40,11 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   > since a standby task keeps getting data from the changelogs written by the 
active stream tasks and hence would not complete restoration at all.
   
   -> `since a standby task does not actually restore state (and 
updating a standby task does never finish).`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-02-06 Thread via GitHub


mjsax commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097933056


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +40,11 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   > since a standby task keeps getting data from the changelogs written by the 
active stream tasks and hence would not complete restoration at all.
   
   -> `since a standby task does not actually _restore_ state (and updating a 
standby does never finish).`



##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +40,11 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   > since a standby task keeps getting data from the changelogs written by the 
active stream tasks and hence would not complete restoration at all.
   
   -> `since a standby task does not actually _restore_ state (and updating a 
standby task does never finish).`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-02-06 Thread via GitHub


mjsax commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097933056


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +40,11 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   > since a standby task keeps getting data from the changelogs written by the 
active stream tasks and hence would not complete restoration at all.
   
   -> `since a standby task does not actually _restore_ state, and updating a 
standby does never finish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-02-06 Thread via GitHub


mjsax commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097931763


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +40,11 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   `restoration` -> `update` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests

2023-02-06 Thread via GitHub


philipnee commented on code in PR #13021:
URL: https://github.com/apache/kafka/pull/13021#discussion_r1097801059


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupStateManager.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class GroupStateManager {

Review Comment:
   thanks for suggestions, changed.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+public class CommitRequestManager implements RequestManager {
+private final Queue stagedCommits;
+// TODO: We will need to refactor the subscriptionState
+private final SubscriptionState subscriptionState;
+private final Logger log;
+private final Optional autoCommitState;
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final GroupStateManager groupState;

Review Comment:
   Agreed. makes sense. thanks.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;

[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-02-06 Thread Hector Geraldino (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684947#comment-17684947
 ] 

Hector Geraldino commented on KAFKA-14132:
--

Perfect. Entered https://issues.apache.org/jira/browse/KAFKA-14683 and 
https://issues.apache.org/jira/browse/KAFKA-14684 to track progress

> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
>  # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
> ([https://github.com/apache/kafka/pull/12728])
>  # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
>  # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
>  # RetryUtilTest (owner: [~mdedetrich-aiven] )
>  # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
>  # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2023-02-06 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-14132:
-
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}InReview{color}
{color:#00875a}Merged{color}
 # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
 # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
 # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
 # WorkerSinkTaskTest (owner: ??) 
 # WorkerSinkTaskThreadedTest (owner: ??)
 # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven])
 # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) 
([https://github.com/apache/kafka/pull/12728])
 # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven])
 # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven])
 # RetryUtilTest (owner: [~mdedetrich-aiven] )
 # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo)
 # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#ff8b00}InReview{color}
> {color:#00875a}Merged{color}
>  # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak])
>  # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo)
>  # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij)
>  # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: 

[jira] [Created] (KAFKA-14684) Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskThreadedTest

2023-02-06 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-14684:


 Summary: Replace EasyMock and PowerMock with Mockito in 
WorkerSinkTaskThreadedTest
 Key: KAFKA-14684
 URL: https://issues.apache.org/jira/browse/KAFKA-14684
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Hector Geraldino
Assignee: Hector Geraldino






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #13116: KAFKA-14351: Controller Mutation Quota for KRaft

2023-02-06 Thread via GitHub


rondagostino commented on code in PR #13116:
URL: https://github.com/apache/kafka/pull/13116#discussion_r1097911296


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -322,15 +325,37 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 }
 // Finally, the idToName map contains all the topics that we are 
authorized to delete.
-// Perform the deletion and create responses for each one.
-controller.deleteTopics(context, idToName.keySet).thenApply { 
idToError =>
-  idToError.forEach { (id, error) =>
-appendResponse(idToName.get(id), id, error)
+// First check the controller mutation quota if necessary, and then

Review Comment:
   I believe these locks are scoped by the controller mutation quota manager 
such that nobody else would contest for them aside from the single-threaded 
controller.  @dajac might you be able to confirm this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14683) Replace EasyMock and PowerMock with Mockito in WorkerSinkTaskTest

2023-02-06 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-14683:


 Summary: Replace EasyMock and PowerMock with Mockito in 
WorkerSinkTaskTest
 Key: KAFKA-14683
 URL: https://issues.apache.org/jira/browse/KAFKA-14683
 Project: Kafka
  Issue Type: Sub-task
  Components: KafkaConnect
Reporter: Hector Geraldino
Assignee: Hector Geraldino






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-06 Thread via GitHub


C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1097852548


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest {
 @Mock private ConnectorOffsetBackingStore offsetStore;
 @Mock private StatusBackingStore statusBackingStore;
 @Mock private WorkerSourceTaskContext sourceTaskContext;
-@MockStrict private TaskStatus.Listener statusListener;

Review Comment:
   In order to retain the same guarantees we have currently w/r/t interactions 
with this class, can we add a call to 
`verifyNoMoreInteractions(statusListener);` in the `tearDown` method?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,93 +814,25 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(transformationChain.apply(any(SourceRecord.class)))
+.thenAnswer((Answer) invocation -> 
invocation.getArgument(0));
+when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+anyString()))
+.thenAnswer((Answer) invocation -> {
+String headerValue = invocation.getArgument(3, String.class);
+return headerValue.getBytes(StandardCharsets.UTF_8);
+});
+when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+.thenReturn(SERIALIZED_KEY);
+when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(RECORD_SCHEMA),
+eq(RECORD)))
+.thenReturn(SERIALIZED_RECORD);
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
-}
-
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
-if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
-
-expectApplyTransformationChain(anyTimes);
-
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
-
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-}
-producerCallbacks.reset();
-}
-return null;
-};
-
-if (anyTimes)
-expect.andStubAnswer(expectResponse);
-else
-expect.andAnswer(expectResponse);
-
-expectTaskGetTopic(anyTimes);
-
-return sent;
-}
-
-private Capture> expectSendRecordAnyTimes() 
{
-return expectSendRecord(TOPIC, true, emptyHeaders());
-}
-
-private Capture> expectSendRecord() {
-return expectSendRecord(TOPIC, false, emptyHeaders());
-}
-
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-} else {
-expect.andAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-}
-if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-

[jira] [Updated] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-02-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14682:
--
Description: 
We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the latter 
instead of the former, which can cause tests to fail due to unnecessary 
stubbings when being run in that IDE but not when being built on Jenkins.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.

  was:
We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.


> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task. Some IDEs (such as IntelliJ) may use the 
> latter instead of the former, which can cause tests to fail due to 
> unnecessary stubbings when being run in that IDE but not when being built on 
> Jenkins.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-02-06 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684909#comment-17684909
 ] 

Chris Egerton commented on KAFKA-14682:
---

[~divijvaidya] FYI, this may interest you.

> Unused stubbings are not reported by Mockito during CI builds
> -
>
> Key: KAFKA-14682
> URL: https://issues.apache.org/jira/browse/KAFKA-14682
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Chris Egerton
>Priority: Major
>
> We've started using [strict 
> stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
>  for unit tests written with Mockito, which is supposed to automatically fail 
> tests when they set up mock expectations that go unused.
> However, these failures are not reported during Jenkins builds, even if they 
> are reported when building/testing locally.
> In at least one case, this difference appears to be because our [Jenkins 
> build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
>  uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
> project's [Gradle build 
> file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
>  instead of the {{test}} task.
> It's possible that, because the custom test tasks filter out some tests from 
> running, Mockito does not check for unnecessary stubbings in order to avoid 
> incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} 
> method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14682) Unused stubbings are not reported by Mockito during CI builds

2023-02-06 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14682:
-

 Summary: Unused stubbings are not reported by Mockito during CI 
builds
 Key: KAFKA-14682
 URL: https://issues.apache.org/jira/browse/KAFKA-14682
 Project: Kafka
  Issue Type: Test
  Components: unit tests
Reporter: Chris Egerton


We've started using [strict 
stubbing|https://javadoc.io/static/org.mockito/mockito-core/4.6.1/org/mockito/junit/MockitoJUnitRunner.StrictStubs.html]
 for unit tests written with Mockito, which is supposed to automatically fail 
tests when they set up mock expectations that go unused.

However, these failures are not reported during Jenkins builds, even if they 
are reported when building/testing locally.

In at least one case, this difference appears to be because our [Jenkins 
build|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/Jenkinsfile#L32-L35]
 uses the custom {{unitTest}} and {{integrationTest}} tasks defined in the 
project's [Gradle build 
file|https://github.com/apache/kafka/blob/6d11261d5deaca300e273bebe309f9e4f814f815/build.gradle#L452-L543],
 instead of the {{test}} task.

It's possible that, because the custom test tasks filter out some tests from 
running, Mockito does not check for unnecessary stubbings in order to avoid 
incorrectly failing tests that set up mocks in, e.g., a {{@BeforeEach}} method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vladimirdyuzhev commented on pull request #13081: Re-using callbackHandler for refreshing Kerberos TGT when keytab is not used

2023-02-06 Thread via GitHub


vladimirdyuzhev commented on PR #13081:
URL: https://github.com/apache/kafka/pull/13081#issuecomment-1419634909

   Created JIRA [KAFKA-14681](https://issues.apache.org/jira/browse/KAFKA-14681)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)

2023-02-06 Thread Vlad D. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad D. updated KAFKA-14681:

Description: 
The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided callback. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used to obtain the 
configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.

  was:
The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided ticket. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used to obtain the 
configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.


> Refreshing Kerberos TGT is not using CallbackHandler (causing failure to 
> refresh)
> -
>
> Key: KAFKA-14681
> URL: https://issues.apache.org/jira/browse/KAFKA-14681
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Vlad D.
>Priority: Major
>  Labels: kerberos, security
>
> The JAAS + Kerberos authentication in KerberosLogin.java class, when 
> obtaining Kerberos TGT, makes use of the client-provided callback handler. 
> This is a must-have when the security configuration is not default.
> However, the same code, when it is time to renew the Kerberos TGT ticket, 
> ignores the provided callback. That works OK for default configuration (JAAS 
> configuration, Kerberos config and keytab are available).
> But when the security configuration sources are custom, and the default 
> Kerberos code is not supporting them, the callback is to be used to obtain 
> the configuration properties.
> A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
> store the callback handler in the super class AbstractLogin, similar to 
> contextName and configuration.
> The fix is in PR [https://github.com/apache/kafka/pull/13081]
> It is tested in our SFT environments and works fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)

2023-02-06 Thread Vlad D. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad D. updated KAFKA-14681:

Description: 
The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided callback. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used even for TGT 
refresh to obtain the configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.

  was:
The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided callback. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used to obtain the 
configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.


> Refreshing Kerberos TGT is not using CallbackHandler (causing failure to 
> refresh)
> -
>
> Key: KAFKA-14681
> URL: https://issues.apache.org/jira/browse/KAFKA-14681
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Vlad D.
>Priority: Major
>  Labels: kerberos, security
>
> The JAAS + Kerberos authentication in KerberosLogin.java class, when 
> obtaining Kerberos TGT, makes use of the client-provided callback handler. 
> This is a must-have when the security configuration is not default.
> However, the same code, when it is time to renew the Kerberos TGT ticket, 
> ignores the provided callback. That works OK for default configuration (JAAS 
> configuration, Kerberos config and keytab are available).
> But when the security configuration sources are custom, and the default 
> Kerberos code is not supporting them, the callback is to be used even for TGT 
> refresh to obtain the configuration properties.
> A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
> store the callback handler in the super class AbstractLogin, similar to 
> contextName and configuration.
> The fix is in PR [https://github.com/apache/kafka/pull/13081]
> It is tested in our SFT environments and works fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)

2023-02-06 Thread Vlad D. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad D. updated KAFKA-14681:

Description: 
The JAAS + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided ticket. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used to obtain the 
configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.

  was:
The SASL + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided ticket. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used to obtain the 
configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.


> Refreshing Kerberos TGT is not using CallbackHandler (causing failure to 
> refresh)
> -
>
> Key: KAFKA-14681
> URL: https://issues.apache.org/jira/browse/KAFKA-14681
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Reporter: Vlad D.
>Priority: Major
>  Labels: kerberos, security
>
> The JAAS + Kerberos authentication in KerberosLogin.java class, when 
> obtaining Kerberos TGT, makes use of the client-provided callback handler. 
> This is a must-have when the security configuration is not default.
> However, the same code, when it is time to renew the Kerberos TGT ticket, 
> ignores the provided ticket. That works OK for default configuration (JAAS 
> configuration, Kerberos config and keytab are available).
> But when the security configuration sources are custom, and the default 
> Kerberos code is not supporting them, the callback is to be used to obtain 
> the configuration properties.
> A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
> store the callback handler in the super class AbstractLogin, similar to 
> contextName and configuration.
> The fix is in PR [https://github.com/apache/kafka/pull/13081]
> It is tested in our SFT environments and works fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14681) Refreshing Kerberos TGT is not using CallbackHandler (causing failure to refresh)

2023-02-06 Thread Vlad D. (Jira)
Vlad D. created KAFKA-14681:
---

 Summary: Refreshing Kerberos TGT is not using CallbackHandler 
(causing failure to refresh)
 Key: KAFKA-14681
 URL: https://issues.apache.org/jira/browse/KAFKA-14681
 Project: Kafka
  Issue Type: Bug
  Components: security
Reporter: Vlad D.


The SASL + Kerberos authentication in KerberosLogin.java class, when obtaining 
Kerberos TGT, makes use of the client-provided callback handler. This is a 
must-have when the security configuration is not default.

However, the same code, when it is time to renew the Kerberos TGT ticket, 
ignores the provided ticket. That works OK for default configuration (JAAS 
configuration, Kerberos config and keytab are available).

But when the security configuration sources are custom, and the default 
Kerberos code is not supporting them, the callback is to be used to obtain the 
configuration properties.

A fix is done to pass the same callback handler in KerberosLogin::reLogin and 
store the callback handler in the super class AbstractLogin, similar to 
contextName and configuration.

The fix is in PR [https://github.com/apache/kafka/pull/13081]

It is tested in our SFT environments and works fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-06 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1419585603

   Related to #13199 
   
   Thanx for heads up @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13972) Reassignment cancellation causes stray replicas

2023-02-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13972.
-
Resolution: Fixed

> Reassignment cancellation causes stray replicas
> ---
>
> Key: KAFKA-13972
> URL: https://issues.apache.org/jira/browse/KAFKA-13972
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 3.4.1
>
>
> A stray replica is one that is left behind on a broker after the partition 
> has been reassigned to other brokers or the partition has been deleted. We 
> found one case where this can happen is after a cancelled reassignment. When 
> a reassignment is cancelled, the controller sends `StopReplica` requests to 
> any of the adding replicas, but it does not necessarily bump the leader 
> epoch. Following 
> [KIP-570|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-570%3A+Add+leader+epoch+in+StopReplicaRequest],]
>  brokers will ignore `StopReplica` requests if the leader epoch matches the 
> current partition leader epoch. So we need to bump the epoch whenever we need 
> to ensure that `StopReplica` will be received.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #13197: Minor: Decode the envelope requests for the request log

2023-02-06 Thread via GitHub


jsancio commented on code in PR #13197:
URL: https://github.com/apache/kafka/pull/13197#discussion_r1097782211


##
core/src/main/scala/kafka/network/RequestConvertToJson.scala:
##
@@ -60,7 +61,16 @@ object RequestConvertToJson {
   case req: ElectLeadersRequest => 
ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
   case req: EndTxnRequest => 
EndTxnRequestDataJsonConverter.write(req.data, request.version)
   case req: EndQuorumEpochRequest => 
EndQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
-  case req: EnvelopeRequest => 
EnvelopeRequestDataJsonConverter.write(req.data, request.version)
+  case req: EnvelopeRequest => {
+val envelopeRequestData = req.data()
+val envelopeData = envelopeRequestData.requestData().duplicate()
+val envelopeHeader = 
EnvelopeUtils.parseForwardedRequestHeader(envelopeData)
+val requestJson = 
RequestConvertToJson.request(AbstractRequest.parseRequest(envelopeHeader.apiKey(),
 envelopeHeader.apiVersion(), envelopeData).request)

Review Comment:
   Hey @mumrah, does this parse the enveloped request twice? If so, is it 
possible to implement this feature without parsing the enveloped request twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dejan2609 opened a new pull request, #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-06 Thread via GitHub


dejan2609 opened a new pull request, #13205:
URL: https://github.com/apache/kafka/pull/13205

   details:
* gradle upgrade: 7.6 -->> 8.0-rc-3
* spotbugs plugin upgrade: 5.0.9 -->> 5.0.13
* declaration test.dependsOn(':spotlessScalaCheck') removed for 
project(':streams:streams-scala') in order to comply with Gradle 8 defaults
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0

2023-02-06 Thread via GitHub


cmccabe merged PR #13198:
URL: https://github.com/apache/kafka/pull/13198


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14680) Gradle version upgrade 7 -->> 8

2023-02-06 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684864#comment-17684864
 ] 

Ismael Juma edited comment on KAFKA-14680 at 2/6/23 6:31 PM:
-

[https://github.com/apache/kafka/pull/13199] helps with some gradle plugin 
version bumps.


was (Author: ijuma):
[https://github.com/apache/kafka/pull/13199] helps.

> Gradle version upgrade 7 -->> 8
> ---
>
> Key: KAFKA-14680
> URL: https://issues.apache.org/jira/browse/KAFKA-14680
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Major
>
> *Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released 
> soon):
>  * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3]
>  * [https://docs.gradle.org/8.0-rc-3/release-notes.html]
> *Upgrade notes:* 
> [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14680) Gradle version upgrade 7 -->> 8

2023-02-06 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684864#comment-17684864
 ] 

Ismael Juma commented on KAFKA-14680:
-

[https://github.com/apache/kafka/pull/13199] helps.

> Gradle version upgrade 7 -->> 8
> ---
>
> Key: KAFKA-14680
> URL: https://issues.apache.org/jira/browse/KAFKA-14680
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Dejan Stojadinović
>Assignee: Dejan Stojadinović
>Priority: Major
>
> *Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released 
> soon):
>  * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3]
>  * [https://docs.gradle.org/8.0-rc-3/release-notes.html]
> *Upgrade notes:* 
> [https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14680) Gradle version upgrade 7 -->> 8

2023-02-06 Thread Jira
Dejan Stojadinović created KAFKA-14680:
--

 Summary: Gradle version upgrade 7 -->> 8
 Key: KAFKA-14680
 URL: https://issues.apache.org/jira/browse/KAFKA-14680
 Project: Kafka
  Issue Type: Improvement
  Components: build
Reporter: Dejan Stojadinović
Assignee: Dejan Stojadinović


*Gradle 8.0.0-RC3 release notes* (note: final 8.0 version is to be released 
soon):
 * [https://github.com/gradle/gradle/releases/tag/v8.0.0-RC3]
 * [https://docs.gradle.org/8.0-rc-3/release-notes.html]

*Upgrade notes:* 
[https://docs.gradle.org/8.0-rc-3/userguide/upgrading_version_7.html#changes_8.0]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-06 Thread via GitHub


mumrah commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419529065

   Gotcha, yea the docs are still in progress. We'll have something published 
soon (hopefully before the release announcement) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-02-06 Thread via GitHub


guozhangwang commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097749095


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -986,8 +986,23 @@ public void unregister(final Collection 
revokedChangelogs) {
 for (final TopicPartition partition : revokedChangelogs) {
 final ChangelogMetadata changelogMetadata = 
changelogs.remove(partition);
 if (changelogMetadata != null) {
+// if the changelog is still in REGISTERED, it means it has 
not initialized and started
+// restoring yet, and hence we should not try to remove the 
changelog partition
 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
 revokedInitializedChangelogs.add(partition);
+
+// if the changelog is not in RESTORING, it means
+// the corresponding onRestoreStart was not called; in 
this case
+// we should not call onRestoreSuspended either
+if (changelogMetadata.stateManager.taskType() == 
Task.TaskType.ACTIVE &&
+
changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+try {
+final String storeName = 
changelogMetadata.storeMetadata.store().name();
+stateRestoreListener.onRestoreSuspended(partition, 
storeName, changelogMetadata.totalRestored);
+} catch (final Exception e) {
+throw new StreamsException("State restore listener 
failed on restore paused", e);

Review Comment:
   The exception returned from user instantiated functions is arbitrary while 
the exception channels we are sending from restore thread to the main thread 
currently is only expecting internal ones. So I think we either need to allow 
arbitrary exceptions to be sent to the main thread via the queue as well, or do 
that. Personally I'm in favor of this approach to avoid a `catch all` clause 
that may hide any lurking exception handling bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14663) High throughput topics can starve low-throughput MM2 offset syncs

2023-02-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14663.
---
Resolution: Duplicate

> High throughput topics can starve low-throughput MM2 offset syncs
> -
>
> Key: KAFKA-14663
> URL: https://issues.apache.org/jira/browse/KAFKA-14663
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0, 3.0.0, 3.3.0, 3.4.0, 3.5.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>
> In MM2, a semaphore is used to throttle the number of offset syncs written to 
> the offset-syncs topic. If too many offset writes are requested (for example, 
> from high-throughput topics) then some are silently dropped and never 
> retried. This is acceptable for a single topic-partition, where a later 
> record may re-trigger the offset-sync and write the sync successfully.
> However, if there is a large variance between throughput in the topics 
> emitted by an MM2 instance, it is possible for high-throughput topics to 
> trigger many offset syncs, and cause the offset-syncs for a co-located 
> low-throughput topic to be unfairly dropped.
> This can cause the downstream offsets for the starved topic to lag behind 
> significantly, or be prevented completely.
> Instead, we should have some sort of fairness mechanism where low-thoughput 
> topics are given similar priority to high-throughput topics in producing 
> offset syncs, and cause excess sync messages from high-throughput topics to 
> be dropped instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-02-06 Thread via GitHub


guozhangwang commented on code in PR #13179:
URL: https://github.com/apache/kafka/pull/13179#discussion_r1097746204


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +40,11 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * 
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   ack



##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -85,4 +93,17 @@ void onRestoreEnd(final TopicPartition topicPartition,
   final String storeName,
   final long totalRestored);
 
+/**
+ * Method called when restoring the {@link StateStore} is suspended due to 
the task being migrated out of the host.
+ * If the migrated task is recycled or re-assigned back to the current 
host, another
+ * {@link #onRestoreStart(TopicPartition, String, long, long)} would be 
called.
+ *
+ * @param topicPartition the TopicPartition containing the values to 
restore

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] FireBurn commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-06 Thread via GitHub


FireBurn commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419452307

   Fab, I've just built 3.4.0 from the tag, but couldn't find the migration 
docs, so worried it had been delayed to 3.4.1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-06 Thread via GitHub


mumrah commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419445476

   @FireBurn, no it hasn't been delayed. We have an early access version coming 
out in 3.4.0 which will be announced this week. We plan on back-porting the 
remaining ZK migration work to the 3.4.x line so we can continue releasing 
without waiting for the next major release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] FireBurn commented on pull request #13183: KAFKA-14668 Avoid unnecessary UMR during ZK migration

2023-02-06 Thread via GitHub


FireBurn commented on PR #13183:
URL: https://github.com/apache/kafka/pull/13183#issuecomment-1419426113

   Has ZK migration to Kraft been delayed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13122: KAFKA-14594: Move LogDirsCommand to tools module

2023-02-06 Thread via GitHub


clolov commented on code in PR #13122:
URL: https://github.com/apache/kafka/pull/13122#discussion_r1097662809


##
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##
@@ -870,7 +870,37 @@ synchronized public AlterReplicaLogDirsResult 
alterReplicaLogDirs(
 @Override
 synchronized public DescribeLogDirsResult 
describeLogDirs(Collection brokers,

Review Comment:
   This implementation is provided because I wanted to use MockAdminClient 
rather than use Mockito and mock the Admin interface. I am open to suggestions 
for improving the current logic.



##
tools/src/test/java/org/apache/kafka/tools/LogDirsCommandTest.java:
##
@@ -0,0 +1,68 @@
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogDirsCommandTest {
+
+@Test
+public void checkLogDirsCommandOutput() throws 
UnsupportedEncodingException, TerseException, ExecutionException, 
JsonProcessingException, InterruptedException {
+ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+PrintStream printStream = new PrintStream(byteArrayOutputStream, 
false, StandardCharsets.UTF_8.name());
+
+PrintStream originalStandardOut = System.out;
+System.setOut(printStream);
+
+Node broker = new Node(1, "hostname", 9092);
+
+try (MockAdminClient adminClient = new 
MockAdminClient(Collections.singletonList(broker), broker)) {

Review Comment:
   I opted to move the unit test as is, rather than improve it. If there is a 
preference to improve on it, I would either use a Mockito to mock the Admin 
interface for describeLogDirs or I would contribute a more accurate 
implementation to the MockAdminClient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13199: MINOR: Update build and test dependencies for 3.5

2023-02-06 Thread via GitHub


ijuma commented on PR #13199:
URL: https://github.com/apache/kafka/pull/13199#issuecomment-1419378585

   @showuon This is now ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13199: MINOR: Update build and test dependencies for 3.5

2023-02-06 Thread via GitHub


ijuma commented on PR #13199:
URL: https://github.com/apache/kafka/pull/13199#issuecomment-1419378251

   JDK 11 build passed, the other two had unrelated failures:
   
   >  Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, 
useInlinePem=true  20 sec  1
   >  Build / JDK 8 and Scala 2.12 / 
kafka.admin.ReassignPartitionsIntegrationTest.testProduceAndConsumeWithReassignmentInProgress(String).quorum=kraft


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM opened a new pull request, #13204: KAFKA-14593: Move LeaderElectionCommand to tools

2023-02-06 Thread via GitHub


OmniaGM opened a new pull request, #13204:
URL: https://github.com/apache/kafka/pull/13204

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14679) Add new __consumer_offsets records

2023-02-06 Thread David Jacot (Jira)
David Jacot created KAFKA-14679:
---

 Summary: Add new __consumer_offsets records
 Key: KAFKA-14679
 URL: https://issues.apache.org/jira/browse/KAFKA-14679
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #13200: KAFKA-14678: Move `__consumer_offsets` records from `core` to `group-coordinator`

2023-02-06 Thread via GitHub


dajac commented on PR #13200:
URL: https://github.com/apache/kafka/pull/13200#issuecomment-1419247177

   @mimaison Thanks! Created KAFKA-14678.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14678) Move __consumer_offsets records from core to group-coordinator

2023-02-06 Thread David Jacot (Jira)
David Jacot created KAFKA-14678:
---

 Summary: Move __consumer_offsets records from core to 
group-coordinator
 Key: KAFKA-14678
 URL: https://issues.apache.org/jira/browse/KAFKA-14678
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097469398


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] clolov commented on a diff in pull request #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers

2023-02-06 Thread via GitHub


clolov commented on code in PR #13196:
URL: https://github.com/apache/kafka/pull/13196#discussion_r1097383481


##
core/src/main/scala/kafka/cluster/Partition.scala:
##
@@ -49,6 +49,25 @@ import org.apache.kafka.server.log.internals.{AppendOrigin, 
FetchDataInfo, Fetch
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+/**
+ * Listener receives notification from an Online Partition.
+ *
+ * A listener can be (re-)registered to an Online partition only. The listener
+ * is notified as long as the partition remains Online. When the partition 
fails
+ * or is deleted, respectively `onFailed` or `onDeleted` are called once. No 
further
+ * notifications are sent after this point on.
+ *
+ * Note that the callbacks are executed in the thread that triggers the change
+ * AND that locks may be hold during their execution. They are meant to be used

Review Comment:
   ```suggestion
* AND that locks may be held during their execution. They are meant to be 
used
   ```



##
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##
@@ -2799,6 +2822,206 @@ class PartitionTest extends AbstractPartitionTest {
 assertEquals(replicas, partition.assignmentState.replicas)
   }
 
+  @Test
+  def testAddAndRemoveListeners(): Unit = {
+partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+partition.makeLeader(
+  new LeaderAndIsrPartitionState()
+.setControllerEpoch(0)
+.setLeader(brokerId)
+.setLeaderEpoch(0)
+.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+.setPartitionEpoch(1)
+.setIsNew(true),
+  offsetCheckpoints,
+  topicId = None)
+
+val listener1 = new MockPartitionListener()
+val listener2 = new MockPartitionListener()
+
+assertTrue(partition.maybeAddListener(listener1))
+listener1.verify(expectedHighWatermark = 0L)
+
+partition.appendRecordsToLeader(
+  records = TestUtils.records(List(new SimpleRecord("k1".getBytes, 
"v1".getBytes))),
+  origin = AppendOrigin.CLIENT,
+  requiredAcks = 0,
+  requestLocal = RequestLocal.NoCaching
+)
+
+listener1.verify()
+listener2.verify()
+
+assertTrue(partition.maybeAddListener(listener2))
+listener2.verify(expectedHighWatermark = 0L)
+
+partition.appendRecordsToLeader(
+  records = TestUtils.records(List(new SimpleRecord("k2".getBytes, 
"v2".getBytes))),
+  origin = AppendOrigin.CLIENT,
+  requiredAcks = 0,
+  requestLocal = RequestLocal.NoCaching
+)
+
+fetchFollower(
+  partition = partition,
+  replicaId = brokerId + 1,
+  fetchOffset = partition.localLogOrException.logEndOffset
+)
+
+listener1.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+listener2.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+
+partition.removeListener(listener1)
+
+partition.appendRecordsToLeader(
+  records = TestUtils.records(List(new SimpleRecord("k3".getBytes, 
"v3".getBytes))),
+  origin = AppendOrigin.CLIENT,
+  requiredAcks = 0,
+  requestLocal = RequestLocal.NoCaching
+)
+
+fetchFollower(
+  partition = partition,
+  replicaId = brokerId + 1,
+  fetchOffset = partition.localLogOrException.logEndOffset
+)
+
+listener1.verify()
+listener2.verify(expectedHighWatermark = 
partition.localLogOrException.logEndOffset)
+  }
+
+  @Test
+  def testAddListenerFailsWhenPartitionIsDeleted(): Unit = {
+partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+partition.makeLeader(
+  new LeaderAndIsrPartitionState()
+.setControllerEpoch(0)
+.setLeader(brokerId)
+.setLeaderEpoch(0)
+.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+.setPartitionEpoch(1)
+.setIsNew(true),
+  offsetCheckpoints,
+  topicId = None)
+
+partition.delete()
+
+assertFalse(partition.maybeAddListener(new MockPartitionListener()))
+  }
+
+  @Test
+  def testPartitionListenerWhenLogOffsetsChanged(): Unit = {
+partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, topicId = None)
+
+partition.makeLeader(
+  new LeaderAndIsrPartitionState()
+.setControllerEpoch(0)
+.setLeader(brokerId)
+.setLeaderEpoch(0)
+.setIsr(List(brokerId, brokerId + 1).map(Int.box).asJava)
+.setReplicas(List(brokerId, brokerId + 1).map(Int.box).asJava)
+.setPartitionEpoch(1)
+.setIsNew(true),
+  offsetCheckpoints,
+  topicId = None)
+
+val listener = new MockPartitionListener()
+partition.maybeAddListener(listener)

Review Comment:
   Nit:
   

[GitHub] [kafka] dajac opened a new pull request, #13203: MINOR: Add KIP-848 new `__consumer_offsets` records

2023-02-06 Thread via GitHub


dajac opened a new pull request, #13203:
URL: https://github.com/apache/kafka/pull/13203

   WIP. https://github.com/apache/kafka/pull/13200 must go first.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes

2023-02-06 Thread via GitHub


fvaleri commented on code in PR #13195:
URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951


##
docs/upgrade.html:
##
@@ -19,6 +19,16 @@
 
 

[GitHub] [kafka] dajac opened a new pull request, #13202: KAFKA-14513; Add broker side PartitionAssignor interface

2023-02-06 Thread via GitHub


dajac opened a new pull request, #13202:
URL: https://github.com/apache/kafka/pull/13202

   This patch adds the broker side `PartitionAssignor` interface as detailed in 
KIP-848. The interfaces differs a bit from the KIP in the following ways:
   * The POJOs are not defined within the interface because the interface is to 
heavy like this.
   * The interface is kept in the `group-coordinator` module for now. We don't 
want to have it out there until KIP-848 is ready to be released. We will move 
it to its final destination later.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes

2023-02-06 Thread via GitHub


fvaleri commented on code in PR #13195:
URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951


##
docs/upgrade.html:
##
@@ -19,6 +19,16 @@
 
 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes

2023-02-06 Thread via GitHub


fvaleri commented on code in PR #13195:
URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951


##
docs/upgrade.html:
##
@@ -19,6 +19,16 @@
 
 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes

2023-02-06 Thread via GitHub


fvaleri commented on code in PR #13195:
URL: https://github.com/apache/kafka/pull/13195#discussion_r1097421951


##
docs/upgrade.html:
##
@@ -19,6 +19,16 @@
 
 

[GitHub] [kafka] clolov commented on a diff in pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0

2023-02-06 Thread via GitHub


clolov commented on code in PR #13198:
URL: https://github.com/apache/kafka/pull/13198#discussion_r1097417323


##
core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala:
##
@@ -174,7 +174,7 @@ class FeatureCommandUnitTest {
   @Test
   def testMetadataVersionsToString(): Unit = {
 assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1",

Review Comment:
   Already pointed out by @satishd, but I believe changing 3.4-IV1 to 3.5-IV0 
here will make the test pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0

2023-02-06 Thread via GitHub


clolov commented on code in PR #13198:
URL: https://github.com/apache/kafka/pull/13198#discussion_r1097417323


##
core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala:
##
@@ -174,7 +174,7 @@ class FeatureCommandUnitTest {
   @Test
   def testMetadataVersionsToString(): Unit = {
 assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1",

Review Comment:
   Already pointed out by @fvaleri, but I believe changing 3.4-IV1 to 3.5-IV0 
here will make the test pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0

2023-02-06 Thread via GitHub


clolov commented on code in PR #13198:
URL: https://github.com/apache/kafka/pull/13198#discussion_r1097417323


##
core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala:
##
@@ -174,7 +174,7 @@ class FeatureCommandUnitTest {
   @Test
   def testMetadataVersionsToString(): Unit = {
 assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1",

Review Comment:
   Already pointed out by @fvaleri, but I believe changing 3.4-IV1 here will 
make the test pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


satishd commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097364723


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] clolov commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes

2023-02-06 Thread via GitHub


clolov commented on code in PR #13195:
URL: https://github.com/apache/kafka/pull/13195#discussion_r1097398711


##
docs/upgrade.html:
##
@@ -19,6 +19,16 @@
 
 

[GitHub] [kafka] dajac commented on a diff in pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`

2023-02-06 Thread via GitHub


dajac commented on code in PR #13200:
URL: https://github.com/apache/kafka/pull/13200#discussion_r1097390286


##
build.gradle:
##
@@ -1266,6 +1272,23 @@ project(':group-coordinator') {
   javadoc {
 enabled = false
   }
+
+  task processMessages(type:JavaExec) {
+mainClass = "org.apache.kafka.message.MessageGenerator"
+classpath = configurations.generator
+args = [ "-p", "org.apache.kafka.coordinator.group.generated",

Review Comment:
   Yeah, I have noticed this as well. Personally, I like having `generated` in 
the package name because it makes the intent clear in the code. I am also fine 
if folks prefer to remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`

2023-02-06 Thread via GitHub


dajac commented on code in PR #13200:
URL: https://github.com/apache/kafka/pull/13200#discussion_r1097389168


##
checkstyle/import-control.xml:
##
@@ -341,7 +341,9 @@
   
 
   
+  

Review Comment:
   Yes. The generated code needs this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

2023-02-06 Thread via GitHub


C0urante commented on PR #13181:
URL: https://github.com/apache/kafka/pull/13181#issuecomment-1419098772

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`

2023-02-06 Thread via GitHub


clolov commented on code in PR #13200:
URL: https://github.com/apache/kafka/pull/13200#discussion_r1097379412


##
checkstyle/import-control.xml:
##
@@ -341,7 +341,9 @@
   
 
   
+  

Review Comment:
   For my curiosity, are these lines needed in the context of this pull request 
or they are a remnant from a previous change?



##
build.gradle:
##
@@ -1266,6 +1272,23 @@ project(':group-coordinator') {
   javadoc {
 enabled = false
   }
+
+  task processMessages(type:JavaExec) {
+mainClass = "org.apache.kafka.message.MessageGenerator"
+classpath = configurations.generator
+args = [ "-p", "org.apache.kafka.coordinator.group.generated",

Review Comment:
   I noticed that sometimes we append `.generated` in similar closures and 
sometimes we do not. Are we appending `.generated` going forward?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


satishd commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097364723


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] fqaiser94 commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-02-06 Thread via GitHub


fqaiser94 commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1097375552


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##
@@ -45,34 +47,85 @@ public void setIfUnset(final SerdeGetter getter) {
 }
 }
 
+@SuppressWarnings("checkstyle:cyclomaticComplexity")
+private boolean isUpgrade(final Map configs) {
+final Object upgradeFrom = 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+if (upgradeFrom == null) {
+return false;
+}
+
+switch ((String) upgradeFrom) {
+case StreamsConfig.UPGRADE_FROM_0100:
+case StreamsConfig.UPGRADE_FROM_0101:
+case StreamsConfig.UPGRADE_FROM_0102:
+case StreamsConfig.UPGRADE_FROM_0110:
+case StreamsConfig.UPGRADE_FROM_10:
+case StreamsConfig.UPGRADE_FROM_11:
+case StreamsConfig.UPGRADE_FROM_20:
+case StreamsConfig.UPGRADE_FROM_21:
+case StreamsConfig.UPGRADE_FROM_22:
+case StreamsConfig.UPGRADE_FROM_23:
+case StreamsConfig.UPGRADE_FROM_24:
+case StreamsConfig.UPGRADE_FROM_25:
+case StreamsConfig.UPGRADE_FROM_26:
+case StreamsConfig.UPGRADE_FROM_27:
+case StreamsConfig.UPGRADE_FROM_28:
+case StreamsConfig.UPGRADE_FROM_30:
+case StreamsConfig.UPGRADE_FROM_31:
+case StreamsConfig.UPGRADE_FROM_32:
+case StreamsConfig.UPGRADE_FROM_33:
+return true;
+default:
+return false;
+}
+}
+
+@Override
+public void configure(final Map configs, final boolean isKey) {
+this.isUpgrade = isUpgrade(configs);
+}
+
 /**
  * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * both values are not null and is upgrading from a version less than 3.4
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
-
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
+
+final byte[] newData = inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newValueIsNull ? 0 : newData.length;
+final int oldDataLength = oldValueIsNull ? 0 : oldData.length;
+
+// The serialization format is:
+// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
+// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
+// {INT newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE 
newOldFlag=2}
+final ByteBuffer buf;
+if (!newValueIsNull && !oldValueIsNull) {
+if (isUpgrade) {
 throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
++ " : " + data.newValue + ") in ChangeSerializer, 
which is not allowed unless upgrading.");
+} else {
+final int capacity = Integer.BYTES + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
+buf = ByteBuffer.allocate(capacity);
+buf.putInt(newDataLength).put(newData).put(oldData).put((byte) 
2);
 }
-
-serializedKey = inner.serialize(topic, headers, data.newValue);
+} else if (!newValueIsNull) {
+final int capacity = newDataLength + NEW_OLD_FLAG_SIZE;
+buf = ByteBuffer.allocate(capacity);
+buf.put(newData).put((byte) 1);
+} else if (!oldValueIsNull) {
+final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE;
+buf = ByteBuffer.allocate(capacity);
+buf.put(oldData).put((byte) 0);
 } else {
-if (data.oldValue == null) {
-throw new StreamsException("Both old and new values are null 
in ChangeSerializer, which is not allowed.");
-}
-
-serializedKey = inner.serialize(topic, headers, data.oldValue);
+throw new StreamsException("Both old and new values are null in 
ChangeSerializer, which is not allowed.");
 }
 
-final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + 
NEWFLAG_SIZE);
-buf.put(serializedKey);
-buf.put((byte) (data.newValue != null ? 1 : 0));
-
 return buf.array();
 }
 

Review Comment:
   Thanks for your feedback!
   Just FYI, I've created the KIP 
[here](https://cwiki.apache.org/confluence/x/P5VbDg). 
   And started a discussion thread 

[GitHub] [kafka] clolov commented on pull request #13199: MINOR: Update build and test dependencies for 3.5

2023-02-06 Thread via GitHub


clolov commented on PR #13199:
URL: https://github.com/apache/kafka/pull/13199#issuecomment-1419072645

   Do we use some tool which suggests these newer versions or we do the checks 
manually for each release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


satishd commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097364723


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] Schm1tz1 commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-02-06 Thread via GitHub


Schm1tz1 commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1097355787


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}

Review Comment:
   @OneCricketeer Good idea! Will have a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13200: MINOR: Move `__consumer_offsets` records from `core` to `group-coordinator`

2023-02-06 Thread via GitHub


dajac commented on PR #13200:
URL: https://github.com/apache/kafka/pull/13200#issuecomment-1419045542

   @mimaison Could you review this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097338495


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097335270


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097332224


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097332224


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[jira] [Commented] (KAFKA-14595) Move ReassignPartitionsCommand to tools

2023-02-06 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684639#comment-17684639
 ] 

Nikolay Izhikov commented on KAFKA-14595:
-

[~omnia_h_ibrahim] Thanks to let me know!

> Move ReassignPartitionsCommand to tools
> ---
>
> Key: KAFKA-14595
> URL: https://issues.apache.org/jira/browse/KAFKA-14595
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Nikolay Izhikov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14595) Move ReassignPartitionsCommand to tools

2023-02-06 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684635#comment-17684635
 ] 

Omnia Ibrahim edited comment on KAFKA-14595 at 2/6/23 12:41 PM:


Hi [~nizhikov], just a note, I moved the methods 
`{{{}TestUtils.setReplicationThrottleForPartitions{}}}` and 
`{{{}TestUtils.removeReplicationThrottleForPartitions`  from 
`{}}}{{{}TestUtils` to `{}}}{{{}ToolsTestUtils{}}}{{{}` {}}}{{ as they are used 
only }} by `TopicCommand` and `ReassignPartitionCommand`.  To avoid the 
converting between Scala and Java collections. The changes are here 
[https://github.com/apache/kafka/pull/13201 
|https://github.com/apache/kafka/pull/13201]


was (Author: omnia_h_ibrahim):
Hi [~nizhikov], just a note, I moved the methods 
`{{{}TestUtils.setReplicationThrottleForPartitions{}}}` and 
`{{{}TestUtils.removeReplicationThrottleForPartitions`  from 
`{}}}{{{}TestUtils` to `{}}}{{{}ToolsTestUtils{}}}{{{}` {}}}{{ as they are used 
only }} by `TopicCommand` and `ReassignPartitionCommand`.  To avoid the 
converting between Scala and Java collections. The changes are here 
https://github.com/apache/kafka/pull/13201{{{}{}}}

> Move ReassignPartitionsCommand to tools
> ---
>
> Key: KAFKA-14595
> URL: https://issues.apache.org/jira/browse/KAFKA-14595
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Nikolay Izhikov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14595) Move ReassignPartitionsCommand to tools

2023-02-06 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684635#comment-17684635
 ] 

Omnia Ibrahim commented on KAFKA-14595:
---

Hi [~nizhikov], just a note, I moved the methods 
`{{{}TestUtils.setReplicationThrottleForPartitions{}}}` and 
`{{{}TestUtils.removeReplicationThrottleForPartitions`  from 
`{}}}{{{}TestUtils` to `{}}}{{{}ToolsTestUtils{}}}{{{}` {}}}{{ as they are used 
only }} by `TopicCommand` and `ReassignPartitionCommand`.  To avoid the 
converting between Scala and Java collections. The changes are here 
https://github.com/apache/kafka/pull/13201{{{}{}}}

> Move ReassignPartitionsCommand to tools
> ---
>
> Key: KAFKA-14595
> URL: https://issues.apache.org/jira/browse/KAFKA-14595
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Nikolay Izhikov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


ijuma commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097325246


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochFileCache.java:
##
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);

Review Comment:
   My bad, we are calling the private `assign` overload., not the public one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] OmniaGM opened a new pull request, #13201: KAFKA-14596: Move TopicCommand to tools

2023-02-06 Thread via GitHub


OmniaGM opened a new pull request, #13201:
URL: https://github.com/apache/kafka/pull/13201

   This pr include 
   - The changes include switching Scala code to java
   - Move TopicCommand and all test cases to tools
   - The PR depends on #13171 to replace the usage of `CoreUtils.duplicate` by 
`ToolsUtils.findDuplicates` 
   
   Some implementation notes: 
   - I added `ToolsTestUtils.createBrokerProperties` as a wrapper for 
`TestUtils.createBrokerConfig` to hide the conversion between Scala and Java 
types. 
   - Replicated `TestUtils.setReplicationThrottleForPartitions` and 
`TestUtils.removeReplicationThrottleForPartitions` to `ToolsTestUtils` as the 
methods are used only `TopicCommandIntegrationTest` and 
`ReassignPartitionsIntegrationTest`. We need to remove it from `TestUtils` once 
we migrate `ReassignPartitions`
   - Replicated `TestInfoUtils.TestWithParameterizedQuorumName` to 
`ToolsTestUtils` as java convert this into a getter function which cannot be 
used with `ParameterizedTest`
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


satishd commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097238384


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.epoch;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.log.internals.EpochEntry;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);
+}
+
+/**
+ * Assigns the supplied Leader Epoch to the supplied Offset
+ * Once the epoch is assigned it cannot be reassigned
+ */
+public void assign(int epoch, long startOffset) {
+EpochEntry entry = new EpochEntry(epoch, startOffset);
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+flush();
+}
+}
+
+public void assign(List entries) {
+entries.forEach(entry -> {
+if (assign(entry)) {
+log.debug("Appended new epoch entry {}. Cache now contains {} 
entries.", entry, epochs.size());
+}
+});
+if (entries.size() > 0) flush();
+}
+
+private boolean isUpdateNeeded(EpochEntry entry) {
+return latestEntry().map(epochEntry -> entry.epoch != epochEntry.epoch 
|| entry.startOffset < epochEntry.startOffset).orElse(true);
+}
+
+private boolean assign(EpochEntry entry) {
+if (entry.epoch < 0 || entry.startOffset < 0) {
+throw new IllegalArgumentException("Received invalid partition 
leader epoch entry " + entry);
+}
+
+// Check whether the append is needed before acquiring the write lock
+// in order to avoid contention with readers in the common case
+if (!isUpdateNeeded(entry)) return false;
+
+lock.writeLock().lock();
+try {
+if (isUpdateNeeded(entry)) {
+maybeTruncateNonMonotonicEntries(entry);
+epochs.put(entry.epoch, entry);
+return true;
+} else {
+return false;
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+/**
+ * Remove any entries which violate monotonicity prior to appending a new 
entry
+ */
+private void maybeTruncateNonMonotonicEntries(EpochEntry newEntry) {
+List removedEpochs = removeFromEnd(entry -> entry.epoch >= 
newEntry.epoch || entry.startOffset >= newEntry.startOffset);
+
+
+if (removedEpochs.size() > 1 || 

[jira] [Assigned] (KAFKA-14578) Move ConsumerPerformance to tools

2023-02-06 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri reassigned KAFKA-14578:
---

Assignee: Federico Valeri

> Move ConsumerPerformance to tools
> -
>
> Key: KAFKA-14578
> URL: https://issues.apache.org/jira/browse/KAFKA-14578
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13128: MINOR: Define a root project name in the Gradle settings file

2023-02-06 Thread via GitHub


mimaison merged PR #13128:
URL: https://github.com/apache/kafka/pull/13128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on a diff in pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-02-06 Thread via GitHub


satishd commented on code in PR #13046:
URL: https://github.com/apache/kafka/pull/13046#discussion_r1097245192


##
storage/src/main/java/org/apache/kafka/server/log/internals/LeaderEpochFileCache.java:
##
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.slf4j.Logger;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH;
+import static 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH_OFFSET;
+
+/**
+ * Represents a cache of (LeaderEpoch => Offset) mappings for a particular 
replica.
+ * 
+ * Leader Epoch = epoch assigned to each leader by the controller.
+ * Offset = offset of the first message in each epoch.
+ */
+public class LeaderEpochFileCache {
+private final LeaderEpochCheckpoint checkpoint;
+private final Logger log;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final TreeMap epochs = new TreeMap<>();
+
+/**
+ * @param topicPartition the associated topic partition
+ * @param checkpoint the checkpoint file
+ */
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+this.checkpoint = checkpoint;
+LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
+log = logContext.logger(LeaderEpochFileCache.class);
+checkpoint.read().forEach(this::assign);

Review Comment:
   `assign(EpochEntry entry)` is already a private method that does not pass 
this instance outside this class. Am I missing anything here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13120: MINOR: Connect Javadocs improvements

2023-02-06 Thread via GitHub


mimaison merged PR #13120:
URL: https://github.com/apache/kafka/pull/13120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >