[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask

2023-07-06 Thread via GitHub


vamossagar12 commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1255253840


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -651,6 +652,40 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 verifyTopicCreation();
 }
 
+@Test
+public void testSendRecordsRetriableException() {
+createWorkerTask();
+
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+expectTaskGetTopic();
+
+when(transformationChain.apply(eq(record1))).thenReturn(null);
+when(transformationChain.apply(eq(record2))).thenReturn(null);
+when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+when(producer.send(any(), any())).thenThrow(new 
RetriableException("Retriable exception")).thenReturn(null);
+
+workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+// The producer throws a RetriableException the first time we try to 
send the third record

Review Comment:
   nit: For better readability, just mention here that the first 2 records have 
been filtered out and only record3 would be sent. Would it be easier to follow 
for anyone who's new to the codebase. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask

2023-07-06 Thread via GitHub


vamossagar12 commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1255252249


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -651,6 +652,40 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 verifyTopicCreation();
 }
 
+@Test
+public void testSendRecordsRetriableException() {
+createWorkerTask();
+
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+expectTaskGetTopic();
+
+when(transformationChain.apply(eq(record1))).thenReturn(null);
+when(transformationChain.apply(eq(record2))).thenReturn(null);
+when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+when(producer.send(any(), any())).thenThrow(new 
RetriableException("Retriable exception")).thenReturn(null);
+
+workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+// The producer throws a RetriableException the first time we try to 
send the third record
+assertFalse(workerTask.sendRecords());
+
+// The next attempt to send the third record should succeed
+assertTrue(workerTask.sendRecords());
+
+// Ensure that the first two records that were filtered out by the 
transformation chain
+// aren't re-processed when we retry the call to sendRecords()
+verify(transformationChain, times(4)).apply(any(SourceRecord.class));

Review Comment:
   Yeah I agree. An explicit check for `record1` and `record2` being 
transformed once and `record3` twice would be good.
   Regarding the duplicate re-transformations, I guess it should be achievable 
by storing a mapping between `SourceRecord` and the `ProducerRecord` ie 
pre-transformed and transformed. We can keep clearing the map as and when the 
entire batch of records are processed. In theory, that could have been another 
way of achieving what we are trying to achieve in this PR i.e if SMT outputs a 
record's transformation as null, we can update the mapping and we can skip it 
for subsequent retries. But the solution in this PR is cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-06 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1624726819

   
`MirrorConnectorsIntegrationBaseTest#testOffsetTranslationBehindReplicationFlow`
 is the only test which is relevant wrt the changes in this PR. I ran it 
locally a couple of times and it passed on both the occasions. Could it be a 
flaky test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14207) Add a 6.10 section for KRaft

2023-07-06 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740864#comment-17740864
 ] 

Luke Chen commented on KAFKA-14207:
---

Thanks for the info, [~gharris1727] !

> Add a 6.10 section for KRaft
> 
>
> Key: KAFKA-14207
> URL: https://issues.apache.org/jira/browse/KAFKA-14207
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: documentation, kraft
> Fix For: 3.3.0
>
>
> The section should talk about:
>  # Limitation
>  # Recommended deployment: external controller
>  # How to start a KRaft cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-06 Thread via GitHub


showuon commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1255129114


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;

Review Comment:
   Caffeine is added in this [PR](https://github.com/apache/kafka/pull/13850) 
to improve concurrent read performance. We chose it since it has good 
performance and is also adopted by other big projects, like Cassandra, HBase 
etc. About the GC issues, I don't think we have discussed about it. cc 
@divijvaidya , since you've done some research on Caffeine, do you have any 
comment to the GC issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-06 Thread via GitHub


showuon commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1255120355


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps around.
+ *
+ * The requirements for this internal cache is as follows:
+ * 1. Multiple threads should be 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jeffkbkim commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1255022451


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+ * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+ * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+ * to ensure that the timer respects the threading model of the 
coordinator runtime.
+ *
+ * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+ * {@link TimeoutOperation} operations scheduled by the coordinators.
+ *
+ * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+ * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+ * not be executed even if it already expired and got pushed to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+ */
+class EventBasedCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(EventBasedCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+// The TimerTask wraps the TimeoutOperation into a 
CoordinatorWriteEvent. When the TimerTask
+// expires, the event is push to the queue of the coordinator 
runtime to be executed. This
+// ensure that the threading model of the runtime is respected.

Review Comment:
   nit: "the event is pushed", "This ensures that"



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java:
##
@@ -16,12 +16,26 @@
  */
 package org.apache.kafka.coordinator.group.runtime;
 
+import org.apache.kafka.common.KafkaException;
+
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
  * An interface to schedule and cancel operations.
  */
-public interface Timer {
+public interface CoordinatorTimer {
+/**
+ * Generates the records needed to implement this timeout write operation. 
In general,

Review Comment:
   actual usages are in https://github.com/apache/kafka/pull/13870, if you 
search for `"timer.schedule"` under GroupMetadataManager (the old join protocol)
   
   one usage is a join operation which after some time expires and we try to 
complete the join phase.
   
   another usage is a heartbeat operation which when expires we remove the 
member from the group.
   
   these are mapped directly from the existing DelayedJoin/DelayedHeartbeat 
used by the purgatory.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+ * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+ * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+ * to ensure that the timer respects the threading model of the 
coordinator runtime.
+ *
+ * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+ * {@link TimeoutOperation} operations scheduled by the coordinators.
+ *
+ * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+ * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+ * not be executed even if it already expired and got pushed to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+ */
+class EventBasedCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */

[GitHub] [kafka] ijuma commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-06 Thread via GitHub


ijuma commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1255034976


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;

Review Comment:
   I think this was already there for the Scala code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1255017069


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -893,4 +1041,250 @@ public void testOnNewMetadataImage() {
 future1.complete(null);
 verify(coordinator1).onLoaded(newImage);
 }
+
+@Test
+public void testScheduleTimer() throws InterruptedException {
+MockTimer timer = new MockTimer();
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(new MockPartitionWriter())
+.withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Check initial state.
+CoordinatorRuntime.CoordinatorContext ctx = 
runtime.contextOrThrow(TP);
+assertEquals(0, ctx.lastWrittenOffset);
+assertEquals(0, ctx.lastCommittedOffset);
+
+// The coordinator timer should be empty.
+assertEquals(0, ctx.timer.size());
+
+// Timer #1.
+ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+() -> Arrays.asList("record1", "record2"));
+
+// Timer #2.
+ctx.coordinator.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS,
+() -> Arrays.asList("record3", "record4"));
+
+// The coordinator timer should have two pending tasks.
+assertEquals(2, ctx.timer.size());
+
+// Advance time to fire timer #1,
+timer.advanceClock(10 + 1);
+
+// Verify that the operation was executed.
+assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());
+assertEquals(1, ctx.timer.size());
+
+// Advance time to fire timer #2,
+timer.advanceClock(10 + 1);
+
+// Verify that the operation was executed.
+assertEquals(mkSet("record1", "record2", "record3", "record4"), 
ctx.coordinator.records());
+assertEquals(0, ctx.timer.size());
+}
+
+@Test
+public void testRescheduleTimer() throws InterruptedException {
+MockTimer timer = new MockTimer();
+ManualEventProcessor processor = new ManualEventProcessor();
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(new MockCoordinatorLoader())
+.withEventProcessor(processor)
+.withPartitionWriter(new MockPartitionWriter())
+.withCoordinatorBuilderSupplier(new 
MockCoordinatorBuilderSupplier())
+.build();
+
+// Loads the coordinator.
+runtime.scheduleLoadOperation(TP, 10);
+
+// Poll twice to process the pending events related to the loading.
+processor.poll();
+processor.poll();
+
+// Check initial state.
+CoordinatorRuntime.CoordinatorContext ctx = 
runtime.contextOrThrow(TP);
+assertEquals(0, ctx.timer.size());
+
+// The processor should be empty.
+assertEquals(0, processor.size());
+
+// Timer #1.
+ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+() -> Collections.singletonList("record1"));
+
+// The coordinator timer should have one pending task.
+assertEquals(1, ctx.timer.size());
+
+// Advance time to fire the pending timer.
+timer.advanceClock(10 + 1);
+
+// An event should be waiting in the processor.
+assertEquals(1, processor.size());
+
+// Schedule a second timer with the same key.
+ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+() -> Collections.singletonList("record2"));
+
+// The coordinator timer should still have one pending task.
+assertEquals(1, ctx.timer.size());
+
+// Schedule a third timer with the same key.
+ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS,
+() -> Collections.singletonList("record3"));
+
+// The coordinator timer should still have one pending task.
+assertEquals(1, ctx.timer.size());
+
+// Advance time to fire the pending timer.
+timer.advanceClock(10 + 1);
+
+// Another event should be waiting in the processor.
+assertEquals(2, processor.size());
+
+// Poll twice to execute the two pending events.
+assertTrue(processor.poll());
+assertTrue(processor.poll());
+
+// Verify that the correct operation was executed. Only the third
+// instance should have been 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-06 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1254999681


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -874,4 +1087,1348 @@ public void replay(
 consumerGroup.updateMember(newMember);
 }
 }
+
+/**
+ * Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value,
+short version
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should not be added.
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+CoordinatorResult newGroupResult = EMPTY_RESULT;
+if (group.isNew()) {

Review Comment:
   i misunderstood, moved to using a local variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to 

[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254998634


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+ * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+ * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+ * to ensure that the timer respects the threading model of the 
coordinator runtime.
+ *
+ * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+ * {@link TimeoutOperation} operations scheduled by the coordinators.
+ *
+ * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+ * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+ * not be executed even if it already expired and got pushed to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+ */
+class EventBasedCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(EventBasedCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+// The TimerTask wraps the TimeoutOperation into a 
CoordinatorWriteEvent. When the TimerTask
+// expires, the event is push to the queue of the coordinator 
runtime to be executed. This
+// ensure that the threading model of the runtime is respected.
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the timeout operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+// If the write event fails, it is rescheduled with a 
small backoff except if the
+// error is fatal.
+event.future.exceptionally(ex -> {
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +

Review Comment:
   I think the usage of delayed also confused me here. In theory if we never 
hit the timeout, we will repeatedly reschedule and hit this right? And it 
steady state, we don't necessarily want to write the event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254998634


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+ * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+ * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+ * to ensure that the timer respects the threading model of the 
coordinator runtime.
+ *
+ * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+ * {@link TimeoutOperation} operations scheduled by the coordinators.
+ *
+ * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+ * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+ * not be executed even if it already expired and got pushed to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+ */
+class EventBasedCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(EventBasedCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+// The TimerTask wraps the TimeoutOperation into a 
CoordinatorWriteEvent. When the TimerTask
+// expires, the event is push to the queue of the coordinator 
runtime to be executed. This
+// ensure that the threading model of the runtime is respected.
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the timeout operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+// If the write event fails, it is rescheduled with a 
small backoff except if the
+// error is fatal.
+event.future.exceptionally(ex -> {
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +

Review Comment:
   I think the usage of delayed also confused me here. In theory if we never 
hit the timeout, we will repeatedly reschedule and hit this right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254995855


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java:
##
@@ -16,12 +16,26 @@
  */
 package org.apache.kafka.coordinator.group.runtime;
 
+import org.apache.kafka.common.KafkaException;
+
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
  * An interface to schedule and cancel operations.
  */
-public interface Timer {
+public interface CoordinatorTimer {
+/**
+ * Generates the records needed to implement this timeout write operation. 
In general,

Review Comment:
   Could we maybe give an example of a timeout operation here? I had a little 
trouble conceptualizing this. 
   22/N gave examples -- ie we need to fence a group after a certain amount of 
time that really made me understand better. 
   
   (I think reviewing the metadata refresh pr -- which had a different 
mechanism for stale data confused me a bit too, but I now understand why that 
can happen on the next heartbeat and why this should be done with a timer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254976555


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Thanks. I also misunderstood Timer and had to read its javadoc. 
   > turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which are executed by the {@link 
CoordinatorEventProcessor}
   
   With respect to the above line, it is fair to understand this as a Timer 
that is used to track when an operation should occur (it usually occurs on a 
timeout) must be modified to handle the CoordinatorWriteEvents respects the 
threading model of the coordinator runtime. This modified Timer 
(CoordinatorTimer) is associated with the event via the TimerTask.
   
   Then this task will be added and will handle timeouts appropriately now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254986327


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+ * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+ * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+ * to ensure that the timer respects the threading model of the 
coordinator runtime.
+ *
+ * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+ * {@link TimeoutOperation} operations scheduled by the coordinators.
+ *
+ * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+ * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+ * not be executed even if it already expired and got pushed to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+ */
+class EventBasedCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(EventBasedCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+// The TimerTask wraps the TimeoutOperation into a 
CoordinatorWriteEvent. When the TimerTask
+// expires, the event is push to the queue of the coordinator 
runtime to be executed. This
+// ensure that the threading model of the runtime is respected.
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the timeout operation.

Review Comment:
   Maybe I'm still being pedantic here, but this is the operation that we will 
do that can time out right?
   Not the action we take if we time out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254986327


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * The EventBasedCoordinatorTimer implements the CoordinatorTimer 
interface and provides an event based
+ * timer which turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which
+ * are executed by the {@link CoordinatorEventProcessor} used by this 
coordinator runtime. This is done
+ * to ensure that the timer respects the threading model of the 
coordinator runtime.
+ *
+ * The {@link CoordinatorWriteEvent} events pushed by the coordinator 
timer wraps the
+ * {@link TimeoutOperation} operations scheduled by the coordinators.
+ *
+ * It also keeps track of all the scheduled {@link TimerTask}. This allows 
timeout operations to be
+ * cancelled or rescheduled. When a timer is cancelled or overridden, the 
previous timer is guaranteed to
+ * not be executed even if it already expired and got pushed to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
rescheduled with a backoff.
+ */
+class EventBasedCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(EventBasedCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+// The TimerTask wraps the TimeoutOperation into a 
CoordinatorWriteEvent. When the TimerTask
+// expires, the event is push to the queue of the coordinator 
runtime to be executed. This
+// ensure that the threading model of the runtime is respected.
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the timeout operation.

Review Comment:
   Maybe I'm still being pedantic here, but this is the operation that we will 
do that can time out right?
   Not the action we take if we time out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14426) Add documentation for Kraft limtations that have open KIPs

2023-07-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14426:

Description: 
Currently there are a number of limitations for Kraft, which are described as 
the motivation for the following open KIPs:
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes]
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery]
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
 

These limitations are:
 * No online method of resizing the controller quorum
 * No online method of recovering from controller disk loss
 * No support for heterogeneous voter lists in running controller nodes
 * When using a quorum size 3, there is no live-upgrade roll which is tolerant 
of a single unplanned machine failure.
 * When using a quorum size >3, there is a risk of zombie leaders causing 
extended outages without the pre-vote feature.

These are significant enough concerns for operations of a Kraft-enabled cluster 
that they should be documented as official limitations in the ops documentation.

Optionally, we may wish to provide or link to more detailed operations 
documentation about performing the offline-resize or offline-recovery stages, 
in addition to describing that such offline procedures are necessary.

  was:
Currently there are a number of limitations for Kraft, which are described as 
the motivation for the following open KIPs:
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes]
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery]
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
 

These limitations are:
 * No online method of resizing the controller quorum
 * No online method of recovering from controller disk loss
 * No support for heterogeneous voter lists in running controller nodes
 * When using a quorum size 3, there is no live-upgrade roll which is tolerant 
of a single unplanned machine failure.
 * When using a quorum size >3, there is a risk of non-linearizable reads.

These are significant enough concerns for operations of a Kraft-enabled cluster 
that they should be documented as official limitations in the ops documentation.

Optionally, we may wish to provide or link to more detailed operations 
documentation about performing the offline-resize or offline-recovery stages, 
in addition to describing that such offline procedures are necessary.


> Add documentation for Kraft limtations that have open KIPs
> --
>
> Key: KAFKA-14426
> URL: https://issues.apache.org/jira/browse/KAFKA-14426
> Project: Kafka
>  Issue Type: Task
>  Components: documentation, kraft
>Reporter: Greg Harris
>Priority: Major
>
> Currently there are a number of limitations for Kraft, which are described as 
> the motivation for the following open KIPs:
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes]
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery]
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
>  
> These limitations are:
>  * No online method of resizing the controller quorum
>  * No online method of recovering from controller disk loss
>  * No support for heterogeneous voter lists in running controller nodes
>  * When using a quorum size 3, there is no live-upgrade roll which is 
> tolerant of a single unplanned machine failure.
>  * When using a quorum size >3, there is a risk of zombie leaders causing 
> extended outages without the pre-vote feature.
> These are significant enough concerns for operations of a Kraft-enabled 
> cluster that they should be documented as official limitations in the ops 
> documentation.
> Optionally, we may wish to provide or link to more detailed operations 
> documentation about performing the offline-resize or offline-recovery stages, 
> in addition to describing that such offline procedures are necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254981176


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Ah I see your other comment in the method.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14207) Add a 6.10 section for KRaft

2023-07-06 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740818#comment-17740818
 ] 

Greg Harris commented on KAFKA-14207:
-

[~showuon] The only accompanying information about that documentation is this 
github comment: 
[https://github.com/apache/kafka/pull/12642#discussion_r980338126] which links 
to 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
 

I mentioned this limitation in KAFKA-14426, but I did not find any other 
sources describing this limitation, or how pre-vote removes the limitation. I 
think that if KIP-650 is relevant, we should add to the `Motivation` section, 
indicating that this feature is required for larger quorums.

> Add a 6.10 section for KRaft
> 
>
> Key: KAFKA-14207
> URL: https://issues.apache.org/jira/browse/KAFKA-14207
> Project: Kafka
>  Issue Type: Sub-task
>  Components: documentation
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>  Labels: documentation, kraft
> Fix For: 3.3.0
>
>
> The section should talk about:
>  # Limitation
>  # Recommended deployment: external controller
>  # How to start a KRaft cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254976555


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Thanks. I also misunderstood Timer and had to read its javadoc. 
   > turns timeouts of a regular {@link Timer} into {@link 
CoordinatorWriteEvent} events which are executed by the {@link 
CoordinatorEventProcessor}
   
   With respect to the above line, it is fair to understand this as a Timer 
that is used to track when an operation should time out must be modified to 
handle the CoordinatorWriteEvents respects the threading model of the 
coordinator runtime. This modified Timer (CoordinatorTimer) is associated with 
the event via the TimerTask.
   
   Then this task will be added and will handle timeouts appropriately now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14426) Add documentation for Kraft limtations that have open KIPs

2023-07-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14426:

Description: 
Currently there are a number of limitations for Kraft, which are described as 
the motivation for the following open KIPs:
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes]
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery]
 * 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
 

These limitations are:
 * No online method of resizing the controller quorum
 * No online method of recovering from controller disk loss
 * No support for heterogeneous voter lists in running controller nodes
 * When using a quorum size 3, there is no live-upgrade roll which is tolerant 
of a single unplanned machine failure.
 * When using a quorum size >3, there is a risk of non-linearizable reads.

These are significant enough concerns for operations of a Kraft-enabled cluster 
that they should be documented as official limitations in the ops documentation.

Optionally, we may wish to provide or link to more detailed operations 
documentation about performing the offline-resize or offline-recovery stages, 
in addition to describing that such offline procedures are necessary.

  was:
Currently there are a number of limitations for Kraft, which are described as 
the motivation for the following open KIPs:
* 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]
* 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery]
* 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
 

These limitations are:
* No online method of resizing the controller quorum
* No online method of recovering from controller disk loss
* No support for heterogeneous voter lists in running controller nodes
* When using a quorum size 3, there is no live-upgrade roll which is tolerant 
of a single unplanned machine failure.
* When using a quorum size >3, there is a risk of non-linearizable reads.

These are significant enough concerns for operations of a Kraft-enabled cluster 
that they should be documented as official limitations in the ops documentation.

Optionally, we may wish to provide or link to more detailed operations 
documentation about performing the offline-resize or offline-recovery stages, 
in addition to describing that such offline procedures are necessary.


> Add documentation for Kraft limtations that have open KIPs
> --
>
> Key: KAFKA-14426
> URL: https://issues.apache.org/jira/browse/KAFKA-14426
> Project: Kafka
>  Issue Type: Task
>  Components: documentation, kraft
>Reporter: Greg Harris
>Priority: Major
>
> Currently there are a number of limitations for Kraft, which are described as 
> the motivation for the following open KIPs:
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes]
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery]
>  * 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote]
>  
> These limitations are:
>  * No online method of resizing the controller quorum
>  * No online method of recovering from controller disk loss
>  * No support for heterogeneous voter lists in running controller nodes
>  * When using a quorum size 3, there is no live-upgrade roll which is 
> tolerant of a single unplanned machine failure.
>  * When using a quorum size >3, there is a risk of non-linearizable reads.
> These are significant enough concerns for operations of a Kraft-enabled 
> cluster that they should be documented as official limitations in the ops 
> documentation.
> Optionally, we may wish to provide or link to more detailed operations 
> documentation about performing the offline-resize or offline-recovery stages, 
> in addition to describing that such offline procedures are necessary.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-06 Thread via GitHub


junrao commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1254942551


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;

Review Comment:
   We introduced a new dependency caffeine here. Could you explain why caffeine 
is chosen and how stable is caffeine? The doc for caffeine mentions the use of 
weak references. A few years back, we avoided the usage of weak references in a 
PR because of the poor GC behavior. Have we done any experiments to understand 
the GC impact?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gaurav-narula commented on a diff in pull request #13949: KAFKA-15141: init logger statically on hot codepaths

2023-07-06 Thread via GitHub


gaurav-narula commented on code in PR #13949:
URL: https://github.com/apache/kafka/pull/13949#discussion_r1254921844


##
core/src/main/scala/kafka/server/DelayedProduce.scala:
##
@@ -58,6 +62,8 @@ class DelayedProduce(delayMs: Long,
  lockOpt: Option[Lock] = None)
   extends DelayedOperation(delayMs, lockOpt) {
 
+  override lazy val logger = DelayedProduce.logger

Review Comment:
   @ijuma any thoughts on the above? Perhaps I'm overlooking something and it 
would help if you could elaborate with an example on scastie.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1

2023-07-06 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740748#comment-17740748
 ] 

Edoardo Comar edited comment on KAFKA-15144 at 7/6/23 8:56 PM:
---

I'll rather rewrite our test expecting the record to be found consuming no more 
than offset.lag.max=100 records from the target cluster, starting from the last 
checkpoint


was (Author: ecomar):
I'll rather rewrite our test expecting the record to be found consuming no more 
than offset.lag.max=100 records from the target cluster

> MM2 Checkpoint downstreamOffset stuck to 1
> --
>
> Key: KAFKA-15144
> URL: https://issues.apache.org/jira/browse/KAFKA-15144
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Attachments: edo-connect-mirror-maker-sourcetarget.properties
>
>
> Steps to reproduce :
> 1.Start the source cluster
> 2.Start the target cluster
> 3.Start connect-mirror-maker.sh using a config like the attached
> 4.Create a topic in source cluster
> 5.produce a few messages
> 6.consume them all with autocommit enabled
>  
> 7. then dump the Checkpoint topic content e.g.
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> source.checkpoints.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}}
> {{{}Checkpoint{consumerGroupId=edogroup, 
> topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, 
> {*}downstreamOffset=1{*}, metadata={
>  
> the downstreamOffset remains at 1, while, in a fresh cluster pair like with 
> the source topic created while MM2 is running, 
> I'd expect the downstreamOffset to match the upstreamOffset.
> Note that dumping the offset sync topic, shows matching initial offsets
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> mm2-offset-syncs.source.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}}
> {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, 
> downstreamOffset=0{
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15148) Some integration tests are running as unit tests

2023-07-06 Thread Ezio Xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740696#comment-17740696
 ] 

Ezio Xie edited comment on KAFKA-15148 at 7/6/23 8:05 PM:
--

I filtered integration tests with "integration" as key word and got these 
integration tests:
{code:java}
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html
 {code}
I added @Tag("integration") to "streams" module tests and 
@Category(IntegrationTest.class) to "connect" module tests, then these 
integration tests are excluded from "task unitTest".

"stream" module's unitTest time reduced from 25 minutes to 17 minutes, 
"connect" module's unitTest time reduced from 18 minutes to 12 minutes.

[~divijvaidya] Do you think this result is in line with your expectations? Or 
any advice?


was (Author: JIRAUSER290463):
I filtered integration tests by (find . -name "*.{*}integration{*}.html") and 
got these integration tests:
{code:java}
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html
 {code}
I added @Tag("integration") to "streams" module tests and 
@Category(IntegrationTest.class) to "connect" module tests, then these 
integration tests are excluded from "task unitTest".

"stream" module's unitTest time reduced from 25 minutes to 17 minutes, 
"connect" module's unitTest time reduced from 18 minutes to 12 minutes.

[~divijvaidya] Do you think this result is in line with your expectations? Or 
any advice?

> Some integration tests are running as unit tests
> 
>
> Key: KAFKA-15148
> URL: 

[GitHub] [kafka] eziosudo opened a new pull request, #13973: exclude integration tests in "./gradlew unitTest" by adding @Tag("integration") and @Category(IntegrationTest.class)

2023-07-06 Thread via GitHub


eziosudo opened a new pull request, #13973:
URL: https://github.com/apache/kafka/pull/13973

   Change these tests by adding @Tag("integration") and 
@Category(IntegrationTest.class).
   `
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html
   
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html
   
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html
   
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html
   `
   
   "stream" module's unitTest time reduced from 25 minutes to 17 minutes, 
"connect" module's unitTest time reduced from 18 minutes to 12 minutes.
   https://github.com/apache/kafka/assets/54128896/dc0e0518-b675-4140-b140-91ea0bd992a5;>
   https://github.com/apache/kafka/assets/54128896/7d0afcce-47c9-45b8-b4be-a9715d3faeff;>
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-15148) Some integration tests are running as unit tests

2023-07-06 Thread Ezio Xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740696#comment-17740696
 ] 

Ezio Xie edited comment on KAFKA-15148 at 7/6/23 7:56 PM:
--

I filtered integration tests by (find . -name "*.{*}integration{*}.html") and 
got these integration tests:
{code:java}
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html
 {code}
I added @Tag("integration") to "streams" module tests and 
@Category(IntegrationTest.class) to "connect" module tests, then these 
integration tests are excluded from "task unitTest".

"stream" module's unitTest time reduced from 25 minutes to 17 minutes, 
"connect" module's unitTest time reduced from 18 minutes to 12 minutes.

[~divijvaidya] Do you think this result is in line with your expectations? Or 
any advice?


was (Author: JIRAUSER290463):
I filtered integration tests by (find . -name "*.*integration*.html") and got 
these integration tests:
{code:java}
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html
 {code}
I added @Tag("integration") to "streams" module tests and 
@Category(IntegrationTest.class) to "connect" module tests, then these 
integration tests are excluded from "task unitTest".

"stream" module's unitTest time reduced from 25 minutes to 16 minutes, 
"connect" module's unitTest time reduced from 18 minutes to 12 minutes.


[~divijvaidya] Do you think this result is in line with your expectations? Or 
any advice?

> Some integration tests are running as unit tests
> 
>
> Key: KAFKA-15148
> URL: 

[GitHub] [kafka] aneelkkhatri opened a new pull request, #13972: Fix one typo in javadoc

2023-07-06 Thread via GitHub


aneelkkhatri opened a new pull request, #13972:
URL: https://github.com/apache/kafka/pull/13972

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joobisb commented on pull request #13862: KAFKA-15050: format the prompts in the quickstart

2023-07-06 Thread via GitHub


joobisb commented on PR #13862:
URL: https://github.com/apache/kafka/pull/13862#issuecomment-1624188820

   @tombentley could you please have a look, have addressed the comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254819460


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Updated the javadoc. Let me know if it makes things clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-07-06 Thread via GitHub


philipnee commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1624179241

   @erikvanoosten - I'll try to help you out on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254807923


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   The confusion may come from 
[here](https://github.com/apache/kafka/pull/13961/files#diff-4b9dd21d7748bfb375634ef6e39037e99f53ed2546519bec32fa1707bd262fe8R271).
 CoordinatorResult is the part which could accept a null response now. In this 
particular case, it will be null because we have no response, only records. The 
null response will be eventually used to complete the event.future. As we use 
event.future.exceptionally, the null response is never used here either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-07-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740751#comment-17740751
 ] 

ASF GitHub Bot commented on KAFKA-14995:


stevenbooke commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1624159774

   @mimaison Could you review this and merge if there is no issues please?




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1

2023-07-06 Thread Edoardo Comar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740748#comment-17740748
 ] 

Edoardo Comar commented on KAFKA-15144:
---

I'll rather rewrite our test expecting the record to be found consuming no more 
than offset.lag.max=100 records from the target cluster

> MM2 Checkpoint downstreamOffset stuck to 1
> --
>
> Key: KAFKA-15144
> URL: https://issues.apache.org/jira/browse/KAFKA-15144
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Attachments: edo-connect-mirror-maker-sourcetarget.properties
>
>
> Steps to reproduce :
> 1.Start the source cluster
> 2.Start the target cluster
> 3.Start connect-mirror-maker.sh using a config like the attached
> 4.Create a topic in source cluster
> 5.produce a few messages
> 6.consume them all with autocommit enabled
>  
> 7. then dump the Checkpoint topic content e.g.
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> source.checkpoints.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}}
> {{{}Checkpoint{consumerGroupId=edogroup, 
> topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, 
> {*}downstreamOffset=1{*}, metadata={
>  
> the downstreamOffset remains at 1, while, in a fresh cluster pair like with 
> the source topic created while MM2 is running, 
> I'd expect the downstreamOffset to match the upstreamOffset.
> Note that dumping the offset sync topic, shows matching initial offsets
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> mm2-offset-syncs.source.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}}
> {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, 
> downstreamOffset=0{
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-07-06 Thread via GitHub


erikvanoosten commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1624147284

   > Have you gotten much feedback from the discussion thread yet?
   
   Not a single comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254792945


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   Right, so maybe I need to figure out where this future code is looked at. 
Because it seems we put this code with the future that could be null in the 
queue. I guess I'm just struggling to understand why something that was 
required to be non-null before can just be changed to accept nulls and where 
that code is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254790531


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Ok -- that very much confused me so maybe we can add comments about how 
this.  
   I will need to reread a lot of this  



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Ok -- that very much confused me so maybe we can add comments about how this 
works.  
   I will need to reread a lot of this  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254782962


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   I was referring to L274. event.future is the future which is completed when 
(or after) the event is executed (and committed).
   
   When you call event.future.exceptionally, it registers a callback/function 
that is executed when the future is completed with an exception. Exceptionally 
returns a new future that is completed with the result of this callback. The 
returned future is not used here so the null we discussed is not going anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254778449


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   No. The timer schedules an event when the timer expires. The event executes 
the timeout operation. If the timeout operation raises an error, the error is 
used to complete the future of the event. This is how the write event works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254741731


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Hmmm --- maybe I'm very confused now. Is the timeout actually a timeout -- 
ie if we haven't done the event by a given time then we return with error? I 
think I missed that and though we were waiting for the timer to hit a time to 
execute the event. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254740479


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   Hmm -- can you point me to the code that "reacts to its result" and how we 
don't use the resulting future. (Maybe I'm just confused) but it looks like we 
always enqueue the event regardless of the future and I don't see where we 
check if its null and not use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15069) Refactor scanning hierarchy out of DelegatingClassLoader

2023-07-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-15069.
-
  Reviewer: Chris Egerton
Resolution: Fixed

> Refactor scanning hierarchy out of DelegatingClassLoader
> 
>
> Key: KAFKA-15069
> URL: https://issues.apache.org/jira/browse/KAFKA-15069
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.6.0
>
>
> The DelegatingClassLoader is involved in both scanning and using the results 
> of scanning to process classloading.
> Instead, the scanning should take place outside of the DelegatingClassLoader, 
> and results of scanning be passed back into the DelegatingClassLoader for 
> classloading functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.

2023-07-06 Thread via GitHub


satishd commented on code in PR #13275:
URL: https://github.com/apache/kafka/pull/13275#discussion_r1254737901


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX;
+import static 
org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX;
+
+/**
+ * This is a LFU (Least Frequently Used) cache of remote index files stored in 
`$logdir/remote-log-index-cache`.
+ * This is helpful to avoid re-fetching the index files like offset, time 
indexes from the remote storage for every
+ * fetch call. The cache is re-initialized from the index files on disk on 
startup, if the index files are available.
+ *
+ * The cache contains a garbage collection thread which will delete the files 
for entries that have been removed from
+ * the cache.
+ *
+ * Note that closing this cache does not delete the index files on disk.
+ * Note that the cache eviction policy is based on the default implementation 
of Caffeine i.e.
+ * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window 
TinyLfu. TinyLfu relies on a frequency
+ * sketch to probabilistically estimate the historic usage of an entry.
+ *
+ */
+public class RemoteIndexCache implements Closeable {
+
+private static final Logger log = 
LoggerFactory.getLogger(RemoteIndexCache.class);
+
+public static final String DIR_NAME = "remote-log-index-cache";
+
+private static final String TMP_FILE_SUFFIX = ".tmp";
+
+public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = 
"remote-log-index-cleaner";
+
+/**
+ * Directory where the index files will be stored on disk.
+ */
+private final File cacheDir;
+
+/**
+ * Represents if the cache is closed or not. Closing the cache is an 
irreversible operation.
+ */
+private final AtomicBoolean isRemoteIndexCacheClosed = new 
AtomicBoolean(false);
+
+/**
+ * Unbounded queue containing the removed entries from the cache which are 
waiting to be garbage collected.
+ */
+private final LinkedBlockingQueue expiredIndexes = new 
LinkedBlockingQueue<>();
+
+/**
+ * Lock used to synchronize close with other read operations. This ensures 
that when we close, we don't have any other
+ * concurrent reads in-progress.
+ */
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+/**
+ * Actual cache implementation that this file wraps around.
+ *
+ * The requirements for this internal cache is as follows:
+ * 1. Multiple threads should be 

[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-07-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740736#comment-17740736
 ] 

ASF GitHub Bot commented on KAFKA-14995:


stevenbooke commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1624069492

   Ok, I will make the change now.




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15069) Refactor scanning hierarchy out of DelegatingClassLoader

2023-07-06 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-15069:

Fix Version/s: 3.6.0

> Refactor scanning hierarchy out of DelegatingClassLoader
> 
>
> Key: KAFKA-15069
> URL: https://issues.apache.org/jira/browse/KAFKA-15069
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.6.0
>
>
> The DelegatingClassLoader is involved in both scanning and using the results 
> of scanning to process classloading.
> Instead, the scanning should take place outside of the DelegatingClassLoader, 
> and results of scanning be passed back into the DelegatingClassLoader for 
> classloading functionality.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254735232


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   Scheduling a timeout is only done from within the execution of an event.
   
   For the usages, you can take a look at the next PR: 
https://github.com/apache/kafka/pull/13963. In short, we need timers to 
implement session and revocation timeouts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-06 Thread via GitHub


gharris1727 opened a new pull request, #13971:
URL: https://github.com/apache/kafka/pull/13971

   Add the ServiceLoaderScanner, a companion to the ReflectionScanner which 
does not use reflection to discover plugins.
   
   This will be utilized later in the Plugins class for the 
configurable-discovery-mode startup scanning, and in the connect-plugin-path 
script. In this PR, the PluginScannerTest is parameterized to ensure that the 
ServiceLoaderScanner has parity on the TestPlugins. This required adding 
manifests for the valid TestPlugins.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


dajac commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254733078


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   Are you referring to the nulls returns in exceptionally above? If yes, those 
are not used anywhere because we don’t use the resulting future.
   
   So we enqueue the event and we react it its result. If it failed, we retry 
it under some conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hgeraldino commented on pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-07-06 Thread via GitHub


hgeraldino commented on PR #13383:
URL: https://github.com/apache/kafka/pull/13383#issuecomment-1624048937

   Committed the suggestions, thanks again for reviewing @C0urante!
   
   BTW, I also opened https://github.com/apache/kafka/pull/13951 to have 
`WorkerSinkTaskThreadedTest` migrated. There's only one more in my bucket 
(WorkerSinkTaskTest) which has proven to be quite challenging


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1

2023-07-06 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740732#comment-17740732
 ] 

Greg Harris commented on KAFKA-15144:
-

Yes [~ecomar] I think the documentation about the checkpoints topic is lacking, 
I see that the MM2 KIP just gives the schema and the 
RemoteClusterUtils/MirrorClient implementations and no further details.
In some discussions recently some people have questioned if anyone is reading 
the checkpoints topic directly; it's good to know that the contents of the 
topic is relevant for you.

> in the past one of our tests was written expecting to find the exact match 
> for upstream/downsteram because that was the externally visible behavior when 
> all wasd good.

Unfortunately this is only true in rather specific circumstances, before or 
after the recent changes. If MirrorSource were to uncleanly restart, or if the 
upstream topic had transaction markers, then the offsets of the source and 
target records will not correspond directly.
For testing the translation, I found it helpful to set `offset.lag.max=0`, and 
only rely on offset translation at the _end_ of the topic between batches. I 
hope fixing your tests goes well.

> MM2 Checkpoint downstreamOffset stuck to 1
> --
>
> Key: KAFKA-15144
> URL: https://issues.apache.org/jira/browse/KAFKA-15144
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Attachments: edo-connect-mirror-maker-sourcetarget.properties
>
>
> Steps to reproduce :
> 1.Start the source cluster
> 2.Start the target cluster
> 3.Start connect-mirror-maker.sh using a config like the attached
> 4.Create a topic in source cluster
> 5.produce a few messages
> 6.consume them all with autocommit enabled
>  
> 7. then dump the Checkpoint topic content e.g.
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> source.checkpoints.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}}
> {{{}Checkpoint{consumerGroupId=edogroup, 
> topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, 
> {*}downstreamOffset=1{*}, metadata={
>  
> the downstreamOffset remains at 1, while, in a fresh cluster pair like with 
> the source topic created while MM2 is running, 
> I'd expect the downstreamOffset to match the upstreamOffset.
> Note that dumping the offset sync topic, shows matching initial offsets
> {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic 
> mm2-offset-syncs.source.internal --from-beginning --formatter 
> org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}}
> {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, 
> downstreamOffset=0{
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner

2023-07-06 Thread via GitHub


C0urante merged PR #13821:
URL: https://github.com/apache/kafka/pull/13821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254713760


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   I guess I also realize I don't fully understand why we need to have a timer 
for these events. But maybe I need to re-read the KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254711785


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();

Review Comment:
   I guess I wasn't sure about adding to the map and running the event. We may 
be only doing one operation that adds to the map at a time, but does that also 
mean we will only hit the time to do the event during this time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15155) Follow PEP 8 best practice in Python to check if a container is empty

2023-07-06 Thread Yi-Sheng Lien (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740713#comment-17740713
 ] 

Yi-Sheng Lien commented on KAFKA-15155:
---

Hi [~divijvaidya] ,

I'm interested in the Kafka project and reading the [HOW TO 
CONTRIBUTE.|https://kafka.apache.org/contributing]
May I take this issue as my first issue of Kafka if you don't mind?

> Follow PEP 8 best practice in Python to check if a container is empty
> -
>
> Key: KAFKA-15155
> URL: https://issues.apache.org/jira/browse/KAFKA-15155
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Trivial
>  Labels: newbie
>
> *This is a good task for first time contributors to Kafka*
> At release.py Line:94 and at Line:60, we don't follow PEP 8 [1] best 
> practices.
> To check if a container or sequence (string, list, tuple) is empty, use if 
> not val. Do not compare its length using if len(val) == 0
> [1] 
> [https://peps.python.org/pep-0008/#programming-recommendations#:~:text=if%20not%20seq]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254698316


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   But I suppose you are saying that null future = null response. 
   I guess I wasn't sure how we write records but have a null response and what 
that means.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime

2023-07-06 Thread via GitHub


jolshan commented on code in PR #13961:
URL: https://github.com/apache/kafka/pull/13961#discussion_r1254697080


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) {
 abstract boolean canTransitionFrom(CoordinatorState state);
 }
 
+/**
+ * Implements the CoordinatorTimer interface. This class keeps track of 
all the
+ * schedule timers for a coordinator/partition.
+ *
+ * When a timer is cancelled or overridden, the previous timer is 
guaranteed to
+ * not be executed even if it already expired and got push to the event 
processor.
+ *
+ * When a timer fails with an unexpected exception, the timer is 
reschedule with
+ * a backoff.
+ */
+class InternalCoordinatorTimer implements CoordinatorTimer {
+/**
+ * The logger.
+ */
+final Logger log;
+
+/**
+ * The topic partition.
+ */
+final TopicPartition tp;
+
+/**
+ * The scheduled timers keyed by their key.
+ */
+final Map tasks = new HashMap<>();
+
+InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) {
+this.tp = tp;
+this.log = logContext.logger(InternalCoordinatorTimer.class);
+}
+
+@Override
+public void schedule(
+String key,
+long delay,
+TimeUnit unit,
+TimeoutOperation operation
+) {
+TimerTask task = new TimerTask(unit.toMillis(delay)) {
+@Override
+public void run() {
+String eventName = "Timeout(tp=" + tp + ", key=" + key + 
")";
+CoordinatorWriteEvent event = new 
CoordinatorWriteEvent<>(eventName, tp, coordinator -> {
+log.debug("Executing write event {} for timer {}.", 
eventName, key);
+
+// If the task is different, it means that the timer 
has been
+// cancelled while the event was waiting to be 
processed.
+if (!tasks.remove(key, this)) {
+throw new RejectedExecutionException("Timer " + 
key + " was overridden or cancelled");
+}
+
+// Execute the real operation.
+return new 
CoordinatorResult<>(operation.generateRecords(), null);
+});
+
+event.future.exceptionally(ex -> {
+// Otherwise, we handle the error if the timer is 
still active.
+if (ex instanceof RejectedExecutionException) {
+log.debug("The delayed write event {} for the 
timer {} was not executed because it was " +
+"cancelled or overridden.", event.name, key);
+return null;
+}
+
+if (ex instanceof NotCoordinatorException || ex 
instanceof CoordinatorLoadInProgressException) {
+log.debug("The delayed write event {} for the 
timer {} failed due to {}. Ignoring it because " +
+"the coordinator is not active.", event.name, 
key, ex.getMessage());
+return null;
+}
+
+log.info("The delayed write event {} for the timer {} 
failed due to {}. Rescheduling it. ",
+event.name, key, ex.getMessage(), ex);
+schedule(key, 500, TimeUnit.MILLISECONDS, operation);
+
+return null;
+});
+
+log.debug("Scheduling write event {} for timer {}.", 
event.name, key);
+try {
+enqueue(event);

Review Comment:
   In the code above this event.future.exceptionally we return null for most 
code paths. I was getting confused on how we can return null for that future 
but still enqueue the event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15148) Some integration tests are running as unit tests

2023-07-06 Thread Ezio Xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740696#comment-17740696
 ] 

Ezio Xie commented on KAFKA-15148:
--

I filtered integration tests by (find . -name "*.*integration*.html") and got 
these integration tests:
{code:java}
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html
./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html
./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html
 {code}
I added @Tag("integration") to "streams" module tests and 
@Category(IntegrationTest.class) to "connect" module tests, then these 
integration tests are excluded from "task unitTest".

"stream" module's unitTest time reduced from 25 minutes to 16 minutes, 
"connect" module's unitTest time reduced from 18 minutes to 12 minutes.


[~divijvaidya] Do you think this result is in line with your expectations? Or 
any advice?

> Some integration tests are running as unit tests
> 
>
> Key: KAFKA-15148
> URL: https://issues.apache.org/jira/browse/KAFKA-15148
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Ezio Xie
>Priority: Minor
>  Labels: newbie
>
> *This is a good item for a newcomer into Kafka code base to pick up*
>  
> When we run `./gradlew unitTest`, it is supposed to run all unit tests. 
> However, we are running some integration tests as part of which makes the 
> overall process of running unitTest take longer than expected.
> Example of such tests:
> > :streams:unitTest > Executing test 
> > org.apache...integration.NamedTopologyIntegrationTest
> > :streams:unitTest > Executing test 
> > org.apache...integration.QueryableStateIntegrationTest
> After this task, we should not run the these tests as part of `./gradlew 
> unitTest`, instead they should be run as part of `./gradlew integrationTest`.
> As part of acceptance criteria, please add the snapshot of html summary 
> generated to verify that these tests are indeed running as part of 
> integrationTest.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-07-06 Thread via GitHub


C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1254675710


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -739,62 +693,42 @@ public void testSlowTaskStart() throws Exception {
 
 createWorkerTask();
 
-offsetStore.start();
-EasyMock.expectLastCall();
-sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-EasyMock.expectLastCall();
-sourceTask.start(TASK_PROPS);
-EasyMock.expectLastCall().andAnswer(() -> {
+doAnswer((Answer) invocation -> {
 startupLatch.countDown();
-assertTrue(awaitLatch(finishStartupLatch));
+ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting 
for task to finish");

Review Comment:
   ```suggestion
   ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting 
for main test thread to allow task startup to complete");
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -315,157 +319,130 @@ public void testPause() throws Exception {
 
 taskFuture.get();
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyTaskGetTopic(count.get());
+verifyOffsetFlush(true);
+verifyTopicCreation(TOPIC);
+verify(statusListener).onPause(taskId);
+verify(statusListener).onShutdown(taskId);
+verify(sourceTask).stop();
+verify(offsetWriter).offset(PARTITION, OFFSET);
+verifyClose();
 }
 
 @Test
 public void testPollsInBackground() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = expectPolls(10);
-// In this test, we don't flush, so nothing goes any further than the 
offset writer
 
 expectTopicCreation(TOPIC);
-
-sourceTask.stop();
-EasyMock.expectLastCall();
-
-offsetWriter.offset(PARTITION, OFFSET);
-PowerMock.expectLastCall();
-expectOffsetFlush(true);
-
-statusListener.onShutdown(taskId);
-EasyMock.expectLastCall();
-
-expectClose();
-
-PowerMock.replayAll();
+expectOffsetFlush();
 
 workerTask.initialize(TASK_CONFIG);
 Future taskFuture = executor.submit(workerTask);
 
-assertTrue(awaitLatch(pollLatch));
+ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG);
 workerTask.stop();
 assertTrue(workerTask.awaitStop(1000));
 
 taskFuture.get();
 assertPollMetrics(10);
-
-PowerMock.verifyAll();
+verifyCleanStartup();
+verifyOffsetFlush(true);
+verify(offsetWriter).offset(PARTITION, OFFSET);
+verify(statusListener).onShutdown(taskId);
+verifyClose();
 }
 
 @Test
 public void testFailureInPoll() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = new CountDownLatch(1);
 final RuntimeException exception = new RuntimeException();
-EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+when(sourceTask.poll()).thenAnswer(invocation -> {
 pollLatch.countDown();
 throw exception;
 });
 
-statusListener.onFailure(taskId, exception);
-EasyMock.expectLastCall();
-
-sourceTask.stop();
-EasyMock.expectLastCall();
 expectEmptyOffsetFlush();
 
-expectClose();
-
-PowerMock.replayAll();
-
 workerTask.initialize(TASK_CONFIG);
 Future taskFuture = executor.submit(workerTask);
 
-assertTrue(awaitLatch(pollLatch));
+ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG);
 //Failure in poll should trigger automatic stop of the task
 assertTrue(workerTask.awaitStop(1000));
-assertShouldSkipCommit();
 
 taskFuture.get();
 assertPollMetrics(0);
 
-PowerMock.verifyAll();
+verifyCleanStartup();
+verify(statusListener).onFailure(taskId, exception);
+verify(sourceTask).stop();
+assertShouldSkipCommit();
+verifyOffsetFlush(true);
+verifyClose();
 }
 
 @Test
 public void testFailureInPollAfterCancel() throws Exception {
 createWorkerTask();
 
-expectCleanStartup();
-
 final CountDownLatch pollLatch = new CountDownLatch(1);
 final CountDownLatch workerCancelLatch = new CountDownLatch(1);
 final RuntimeException exception = new RuntimeException();
-EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
+when(sourceTask.poll()).thenAnswer(invocation -> {
 pollLatch.countDown();
-assertTrue(awaitLatch(workerCancelLatch));
+ConcurrencyUtils.awaitLatch(workerCancelLatch, "Timeout waiting 
for task 

[GitHub] [kafka] C0urante commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask

2023-07-06 Thread via GitHub


C0urante commented on code in PR #13955:
URL: https://github.com/apache/kafka/pull/13955#discussion_r1254656021


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -651,6 +652,40 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 verifyTopicCreation();
 }
 
+@Test
+public void testSendRecordsRetriableException() {
+createWorkerTask();
+
+SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
+expectTaskGetTopic();
+
+when(transformationChain.apply(eq(record1))).thenReturn(null);
+when(transformationChain.apply(eq(record2))).thenReturn(null);
+when(transformationChain.apply(eq(record3))).thenReturn(record3);
+
+TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+
when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, 
topicDesc));
+
+when(producer.send(any(), any())).thenThrow(new 
RetriableException("Retriable exception")).thenReturn(null);
+
+workerTask.toSend = Arrays.asList(record1, record2, record3);
+
+// The producer throws a RetriableException the first time we try to 
send the third record
+assertFalse(workerTask.sendRecords());
+
+// The next attempt to send the third record should succeed
+assertTrue(workerTask.sendRecords());
+
+// Ensure that the first two records that were filtered out by the 
transformation chain
+// aren't re-processed when we retry the call to sendRecords()
+verify(transformationChain, times(4)).apply(any(SourceRecord.class));

Review Comment:
   Do you think it might be worth it to be more explicit in the verifications 
here? I.e., verify that `record1` and `record2` were transformed only once, but 
`record3` was transformed twice?
   
   Also, it's a little strange that we're re-transforming a record for each 
retry. If it's not too much we may want to refine that logic to only transform 
each record at most once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-07-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740686#comment-17740686
 ] 

ASF GitHub Bot commented on KAFKA-14995:


mimaison commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1623947447

   The latest committer is Divij Vaidya, see at the bottom of 
https://kafka.apache.org/committers.
   His Github username is `divijvaidya`.




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh

2023-07-06 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740683#comment-17740683
 ] 

ASF GitHub Bot commented on KAFKA-14995:


stevenbooke commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1623938985

   Who is the new Committer? What is their Github username?




> Automate asf.yaml collaborators refresh
> ---
>
> Key: KAFKA-14995
> URL: https://issues.apache.org/jira/browse/KAFKA-14995
> Project: Kafka
>  Issue Type: Improvement
>Reporter: John Roesler
>Assignee: Steven Booke
>Priority: Minor
>  Labels: newbie
>
> We have added a policy to use the asf.yaml Github Collaborators: 
> [https://github.com/apache/kafka-site/pull/510]
> The policy states that we set this list to be the top 20 commit authors who 
> are not Kafka committers. Unfortunately, it's not trivial to compute this 
> list.
> Here is the process I followed to generate the list the first time (note that 
> I generated this list on 2023-04-28, so the lookback is one year:
> 1. List authors by commit volume in the last year:
> {code:java}
> $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code}
> 2. manually filter out the authors who are committers, based on 
> [https://kafka.apache.org/committers]
> 3. truncate the list to 20 authors
> 4. for each author
> 4a. Find a commit in the `git log` that they were the author on:
> {code:java}
> commit 440bed2391338dc10fe4d36ab17dc104b61b85e8
> Author: hudeqi <1217150...@qq.com>
> Date:   Fri May 12 14:03:17 2023 +0800
> ...{code}
> 4b. Look up that commit in Github: 
> [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8]
> 4c. Copy their Github username into .asf.yaml under both the PR whitelist and 
> the Collaborators lists.
> 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713]
>  
> This is pretty time consuming and is very scriptable. Two complications:
>  * To do the filtering, we need to map from Git log "Author" to documented 
> Kafka "Committer" that we can use to perform the filter. Suggestion: just 
> update the structure of the "Committers" page to include their Git "Author" 
> name and email 
> ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)]
>  * To generate the YAML lists, we need to map from Git log "Author" to Github 
> username. There's presumably some way to do this in the Github REST API (the 
> mapping is based on the email, IIUC), or we could also just update the 
> Committers page to also document each committer's Github username.
>  
> Ideally, we would write this script (to be stored in the Apache Kafka repo) 
> and create a Github Action to run it every three months.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on a diff in pull request #13959: KAFKA-15129;[5/N] Remove metrics in ControllerChannelManager when broker shutdown

2023-07-06 Thread via GitHub


hudeqi commented on code in PR #13959:
URL: https://github.com/apache/kafka/pull/13959#discussion_r1254636348


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -537,6 +537,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def removeMetrics(): Unit = {
 KafkaController.MetricNames.foreach(metricsGroup.removeMetric)
+controllerChannelManager.removeMetrics()

Review Comment:
   Since the current ControllerChannelManager only removes the tagged metrics 
(that is, gaugeMetricNameWithTag and timerMetricNameWithTag) of all brokers in 
brokerStateInfo during shutdown, but omits TotalQueueSizeMetricName, I added 
the removeMetrics method to remove it during broker shutdown. Do you mean to 
combine the two into the shutdown method of ControllerChannelManager? 
@divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13797: KAFKA-14950: implement assign() and assignment()

2023-07-06 Thread via GitHub


philipnee commented on PR #13797:
URL: https://github.com/apache/kafka/pull/13797#issuecomment-1623905234

   @kirktrue thanks for taking over this. @junrao - would you be up for another 
round of review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1

2023-07-06 Thread via GitHub


mimaison commented on PR #13260:
URL: https://github.com/apache/kafka/pull/13260#issuecomment-1623903798

   Also we need to update the `LICENSE-binary` file as mentioned in 
https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L73-L74


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-07-06 Thread via GitHub


philipnee commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1623900690

   Hey from the first look, I think I think this is a reasonable suggestion.  
Have you gotten much feedback from the discussion thread yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13960: KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown

2023-07-06 Thread via GitHub


hudeqi commented on code in PR #13960:
URL: https://github.com/apache/kafka/pull/13960#discussion_r1254619269


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -537,6 +538,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def removeMetrics(): Unit = {
 KafkaController.MetricNames.foreach(metricsGroup.removeMetric)
+controllerContext.stats.removeMetrics()

Review Comment:
   My thought is: this PR is to remove the ControllerStats metric when the 
broker shutdown. Before that, every broker in the cluster has registered all 
the metrics of ControllerStats. If following your suggestion, then only the 
real controller broker will register the metrics of ControllerStats, while 
other brokers do not. I don’t know this Is a logical change of a piece 
necessary? @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15159) Update minor dependencies in preparation for 3.5.1

2023-07-06 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15159:
-
Fix Version/s: 3.5.1

> Update minor dependencies in preparation for 3.5.1
> --
>
> Key: KAFKA-15159
> URL: https://issues.apache.org/jira/browse/KAFKA-15159
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.5.1
>
>
> Go through the list of dependencies in dependencies.gradle. Check if newer 
> minor versions (backward compatible) are available, is yes, then create a PR 
> for them. Especially important are the ones which fix a CVE.
> We will merge them into trunk and backport it to 3.5 branch. Similar to 
> [https://github.com/apache/kafka/pull/13673] here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15159) Update minor dependencies in preparation for 3.5.1

2023-07-06 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15159:
-
Description: 
Go through the list of dependencies in dependencies.gradle. Check if newer 
minor versions (backward compatible) are available, is yes, then create a PR 
for them. Especially important are the ones which fix a CVE.

We will merge them into trunk and backport it to 3.5 branch. Similar to 
[https://github.com/apache/kafka/pull/13673] here.

  was:
Go through the list of dependencies in dependencies.gradle. Check if newer 
minor versions (backward compatible) are available, is yes, then create a PR 
for them. Especially important are the ones which fix a CVE.

We will merge them into trunk and backport it to 3.5 branch.


> Update minor dependencies in preparation for 3.5.1
> --
>
> Key: KAFKA-15159
> URL: https://issues.apache.org/jira/browse/KAFKA-15159
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Major
>
> Go through the list of dependencies in dependencies.gradle. Check if newer 
> minor versions (backward compatible) are available, is yes, then create a PR 
> for them. Especially important are the ones which fix a CVE.
> We will merge them into trunk and backport it to 3.5 branch. Similar to 
> [https://github.com/apache/kafka/pull/13673] here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15159) Update minor dependencies in preparation for 3.5.1

2023-07-06 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15159:


 Summary: Update minor dependencies in preparation for 3.5.1
 Key: KAFKA-15159
 URL: https://issues.apache.org/jira/browse/KAFKA-15159
 Project: Kafka
  Issue Type: Improvement
Reporter: Divij Vaidya


Go through the list of dependencies in dependencies.gradle. Check if newer 
minor versions (backward compatible) are available, is yes, then create a PR 
for them. Especially important are the ones which fix a CVE.

We will merge them into trunk and backport it to 3.5 branch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15102) Mirror Maker 2 - KIP690 backward compatibility

2023-07-06 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740663#comment-17740663
 ] 

Chris Egerton commented on KAFKA-15102:
---

 [~omnia_h_ibrahim] I've assigned you this ticket so nobody else picks it up 
by mistake; feel free to unassign if you no longer want to work on this.

> Mirror Maker 2 - KIP690 backward compatibility
> --
>
> Key: KAFKA-15102
> URL: https://issues.apache.org/jira/browse/KAFKA-15102
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0
>Reporter: David Dufour
>Priority: Major
>
> According to KIP690, "When users upgrade an existing MM2 cluster they don’t 
> need to change any of their current configuration as this proposal maintains 
> the default behaviour for MM2."
> Now, the separator is subject to customization.
> As a consequence, when an MM2 upgrade is performed, if the separator was 
> customized with replication.policy.separator, the name of this internal topic 
> changes. It then generates issues like:
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.InvalidTopicException: Topic 
> 'mm2-offset-syncs_bkts28_internal' collides with existing topics: 
> mm2-offset-syncs.bkts28.internal
> It has been observed that the replication can then be broken sometimes 
> several days after the upgrade (reason not identified). By deleting the old 
> topic name, it recovers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15102) Mirror Maker 2 - KIP690 backward compatibility

2023-07-06 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15102:
-

Assignee: Omnia Ibrahim

> Mirror Maker 2 - KIP690 backward compatibility
> --
>
> Key: KAFKA-15102
> URL: https://issues.apache.org/jira/browse/KAFKA-15102
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0
>Reporter: David Dufour
>Assignee: Omnia Ibrahim
>Priority: Major
>
> According to KIP690, "When users upgrade an existing MM2 cluster they don’t 
> need to change any of their current configuration as this proposal maintains 
> the default behaviour for MM2."
> Now, the separator is subject to customization.
> As a consequence, when an MM2 upgrade is performed, if the separator was 
> customized with replication.policy.separator, the name of this internal topic 
> changes. It then generates issues like:
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.InvalidTopicException: Topic 
> 'mm2-offset-syncs_bkts28_internal' collides with existing topics: 
> mm2-offset-syncs.bkts28.internal
> It has been observed that the replication can then be broken sometimes 
> several days after the upgrade (reason not identified). By deleting the old 
> topic name, it recovers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15157) Print startup time for RemoteIndexCache

2023-07-06 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-15157:


Assignee: Lan Ding

> Print startup time for RemoteIndexCache
> ---
>
> Key: KAFKA-15157
> URL: https://issues.apache.org/jira/browse/KAFKA-15157
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Major
>
> When RemoteIndexCache starts up, it will try to re-build the in-memory cache 
> using the files already present on the disk in the remote-index-cache 
> directory. The process involves:
> 1. deleting existing files which are pending delete i.e. have a .delete suffix
> 2. read the cached index files, if present.
> 3. creating the indexes (this step will create a MMapped'buffer)
> 4. perform sanity check on the indexes
> 5. add to internal cache
> The steps 2-5 are bound by the maximum number of entries in the cache. But 
> step 1 could be arbitrary large.
> To debug a slow cache startup, we should add a info statement that prints the 
> time it took to initialize the cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] DL1231 opened a new pull request, #13970: KAFKA-15157: Print startup time for RemoteIndexCache

2023-07-06 Thread via GitHub


DL1231 opened a new pull request, #13970:
URL: https://github.com/apache/kafka/pull/13970

   https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15157
   Print startup time for RemoteIndexCache.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown

2023-07-06 Thread via GitHub


hudeqi commented on code in PR #13929:
URL: https://github.com/apache/kafka/pull/13929#discussion_r1254581656


##
core/src/main/scala/kafka/server/AbstractFetcherManager.scala:
##
@@ -226,6 +240,29 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   fetcherThreadMap.clear()
 }
   }
+
+  def shutdown(): Unit = {
+info("shutting down")
+try {
+  closeAllFetchers()
+} finally {
+  removeMetrics()
+}
+info("shutdown completed")
+  }
+
+  private[server] def removeMetrics(): Unit = {
+metricNamesToTags.foreach(metricTags => {

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15153) Use Python `is` instead of `==` to compare for None

2023-07-06 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15153:
-
Fix Version/s: 3.6.0

> Use Python `is` instead of  `==` to compare for None
> 
>
> Key: KAFKA-15153
> URL: https://issues.apache.org/jira/browse/KAFKA-15153
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Minor
>  Labels: newbie
> Fix For: 3.6.0
>
>
> *This is a good item to be picked by first time contributors to Kafka code 
> base*
> At release_notes.py Line: 47
> The {{==}} and {{!=}} operators use the compared objects' {{__eq__}} method 
> to test if they are equal. To check if an object is a singleton, such as 
> {{{}None{}}}, we recommend that you use the {{is}} identity comparison 
> operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15153) Use Python `is` instead of `==` to compare for None

2023-07-06 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-15153.
--
  Reviewer: Divij Vaidya
Resolution: Fixed

> Use Python `is` instead of  `==` to compare for None
> 
>
> Key: KAFKA-15153
> URL: https://issues.apache.org/jira/browse/KAFKA-15153
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Minor
>  Labels: newbie
> Fix For: 3.6.0
>
>
> *This is a good item to be picked by first time contributors to Kafka code 
> base*
> At release_notes.py Line: 47
> The {{==}} and {{!=}} operators use the compared objects' {{__eq__}} method 
> to test if they are equal. To check if an object is a singleton, such as 
> {{{}None{}}}, we recommend that you use the {{is}} identity comparison 
> operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya merged pull request #13964: KAFKA-15153: Use Python 'is' instead of '==' to compare for None

2023-07-06 Thread via GitHub


divijvaidya merged PR #13964:
URL: https://github.com/apache/kafka/pull/13964


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12969) Add cluster or broker level config for topic level tiered storage confgs.

2023-07-06 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740642#comment-17740642
 ] 

Divij Vaidya commented on KAFKA-12969:
--

Please ensure that we also add validations here which throw ConfigException on 
failure, e.g. we cannot enable RemoteStorage for a topic if cluster level is 
not enabled etc.

> Add  cluster or broker level config for topic level tiered storage confgs.
> --
>
> Key: KAFKA-12969
> URL: https://issues.apache.org/jira/browse/KAFKA-12969
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-06 Thread via GitHub


vamossagar12 commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1623807270

   Thanks @yashmayya . I addressed the rest of the comments that you had.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15158) Add metrics for RemoteRequestsPerSec

2023-07-06 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15158:
-
Description: 
Add the following metrics for better observability into the RemoteLog related 
activities inside the broker.

1. RemoteWriteRequestsPerSec

2. RemoteDeleteRequestsPerSec

3. BuildRemoteLogAuxStateRequestsPerSec

 

These metrics will be calculated at topic level (we can add them at 
brokerTopicStats)

*RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager#
copyLogSegmentsToRemote()
 
*RemoteDeleteRequestsPerSec* will be marked on every call to 
RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced 
in [https://github.com/apache/kafka/pull/13561] 

*BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to 
ReplicaFetcherTierStateMachine#buildRemoteLogAuxState()

 

 

(Note that this requires a change in KIP-405 and hence, must be approved by KIP 
author [~satishd] )
 

  was:
Add the following metrics for better observability into the RemoteLog related 
activities inside the broker.

1. RemoteWriteRequestsPerSec


2. RemoteDeleteRequestsPerSec


3. BuildRemoteLogAuxStateRequestsPerSec

 

These metrics will be calculated at topic level (we can add them at 
brokerTopicStats)


*RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager#
copyLogSegmentsToRemote()
 
*RemoteDeleteRequestsPerSec* will be marked on every call to 
RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced 
in [https://github.com/apache/kafka/pull/13561] 

*BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to 
ReplicaFetcherTierStateMachine#buildRemoteLogAuxState()
 


> Add metrics for RemoteRequestsPerSec
> 
>
> Key: KAFKA-15158
> URL: https://issues.apache.org/jira/browse/KAFKA-15158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Divij Vaidya
>Priority: Major
> Fix For: 3.6.0
>
>
> Add the following metrics for better observability into the RemoteLog related 
> activities inside the broker.
> 1. RemoteWriteRequestsPerSec
> 2. RemoteDeleteRequestsPerSec
> 3. BuildRemoteLogAuxStateRequestsPerSec
>  
> These metrics will be calculated at topic level (we can add them at 
> brokerTopicStats)
> *RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager#
> copyLogSegmentsToRemote()
>  
> *RemoteDeleteRequestsPerSec* will be marked on every call to 
> RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced 
> in [https://github.com/apache/kafka/pull/13561] 
> *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to 
> ReplicaFetcherTierStateMachine#buildRemoteLogAuxState()
>  
>  
> (Note that this requires a change in KIP-405 and hence, must be approved by 
> KIP author [~satishd] )
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-06 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1254544674


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -16,23 +16,36 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.powermock.reflect.Whitebox;

Review Comment:
   Got it. I removed the usage of Whitebox.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-06 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1254543662


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws 
InterruptedException, TimeoutException, ExecutionException {
+
+KafkaOffsetBackingStore connectorStore = 
setupOffsetBackingStoreWithProducer("topic1", false);
+KafkaOffsetBackingStore workerStore = 
setupOffsetBackingStoreWithProducer("topic2", true);
+
+extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+workerStore,
+connectorStore,
+"offsets-topic",
+mock(TopicAdmin.class));
+
+OffsetStorageWriter offsetStorageWriter = new 
OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, 
valueConverter);
+
+offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE);
+assertTrue(offsetStorageWriter.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
+Future flushFuture = offsetStorageWriter.doFlush((error, result) 
-> {
+assertEquals(PRODUCE_EXCEPTION, error);
+assertNull(result);
+});
+assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, 
TimeUnit.MILLISECONDS));
+}
+
+@Test
+public void 
testFlushSuccessWhenWritesSucceedToBothPrimaryAndSecondaryStoresForTombstoneOffsets()
 throws InterruptedException, TimeoutException, ExecutionException {
+
+KafkaOffsetBackingStore connectorStore = 
setupOffsetBackingStoreWithProducer("topic1", false);
+KafkaOffsetBackingStore workerStore = 
setupOffsetBackingStoreWithProducer("topic2", false);
+
+extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+workerStore,
+connectorStore,
+"offsets-topic",
+mock(TopicAdmin.class));
+
+OffsetStorageWriter offsetStorageWriter = new 
OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, 
valueConverter);
+
+offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE);
+assertTrue(offsetStorageWriter.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
+offsetStorageWriter.doFlush((error, result) -> {
+assertNull(error);
+assertNull(result);
+}).get(1000L, TimeUnit.MILLISECONDS);
+}
+
+@Test
+public void 
testFlushSuccessWhenWriteToSecondaryStoreFailsForNonTombstoneOffsets() throws 
InterruptedException, TimeoutException, ExecutionException {
+
+KafkaOffsetBackingStore connectorStore = 
setupOffsetBackingStoreWithProducer("topic1", false);
+KafkaOffsetBackingStore workerStore = 
setupOffsetBackingStoreWithProducer("topic2", true);
+
+extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+workerStore,
+connectorStore,
+"offsets-topic",
+mock(TopicAdmin.class));
+
+OffsetStorageWriter offsetStorageWriter = new 
OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, 
valueConverter);
+
+offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE);
+assertTrue(offsetStorageWriter.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
+offsetStorageWriter.doFlush((error, result) -> {
+assertNull(error);
+assertNull(result);
+}).get(1000L, TimeUnit.MILLISECONDS);
+}
+
+@Test
+public void testFlushSuccessWhenWritesToPrimaryAndSecondaryStoreSucceeds() 
throws InterruptedException, TimeoutException, ExecutionException {
+
+KafkaOffsetBackingStore connectorStore = 
setupOffsetBackingStoreWithProducer("topic1", false);
+KafkaOffsetBackingStore workerStore = 
setupOffsetBackingStoreWithProducer("topic2", false);
+
+extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+workerStore,
+

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-06 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1254543055


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws 
InterruptedException, TimeoutException, ExecutionException {

Review Comment:
   Removed such instances from other tests as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-06 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1254542495


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});

Review Comment:
   I made the change. That has worked. Thanks!



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws 
InterruptedException, TimeoutException, ExecutionException {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15149) Fix not sending UMR and LISR RPCs in dual-write mode when there are new partitions

2023-07-06 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-15149:
-
Fix Version/s: 3.5.1

> Fix not sending UMR and LISR RPCs in dual-write mode when there are new 
> partitions
> --
>
> Key: KAFKA-15149
> URL: https://issues.apache.org/jira/browse/KAFKA-15149
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
> Fix For: 3.5.1
>
>
> In AK in {{KRaftMigrationZkWriter}} 
> [here|https://github.com/apache/kafka/blame/trunk/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java#L294]
>  we keep references to both the new and changed partitions maps from the 
> {{TopicsDelta}} instance. We mutate {{changedPartitions}} resulting in 
> possibly mutating the {{TopicsDelta}} instance that is provided as input to 
> the method. After making the ZK writes when we try and figure out the UMR and 
> LISR requests we need to make in 
> {{MigrationPropagator.sendRPCsToBrokersFromMetadataDelta}} the 
> {{TopicsDelta}} has lost the changed partitions metadata. As a result, we 
> might not send the expected UMR and LISR requests. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15158) Add metrics for RemoteRequestsPerSec

2023-07-06 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15158:


 Summary: Add metrics for RemoteRequestsPerSec
 Key: KAFKA-15158
 URL: https://issues.apache.org/jira/browse/KAFKA-15158
 Project: Kafka
  Issue Type: Sub-task
Reporter: Divij Vaidya
 Fix For: 3.6.0


Add the following metrics for better observability into the RemoteLog related 
activities inside the broker.

1. RemoteWriteRequestsPerSec


2. RemoteDeleteRequestsPerSec


3. BuildRemoteLogAuxStateRequestsPerSec

 

These metrics will be calculated at topic level (we can add them at 
brokerTopicStats)


*RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager#
copyLogSegmentsToRemote()
 
*RemoteDeleteRequestsPerSec* will be marked on every call to 
RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced 
in [https://github.com/apache/kafka/pull/13561] 

*BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to 
ReplicaFetcherTierStateMachine#buildRemoteLogAuxState()
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-06 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14509:

Description: 
The goal of this task is to implement the ConsumerGroupDescribe API as 
described 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
 and to implement the related changes in the admin client as described 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].

On the server side, this mainly requires the following steps:
 # The request/response schemas must be defined (see 
ListGroupsRequest/Response.json for an example);
 # Request/response classes must be defined (see 
ListGroupsRequest/Response.java for an example);
 # The API must be defined in KafkaApis (see 
KafkaApis#handleDescribeGroupsRequest for an example);
 # The GroupCoordinator interface (java file) must be extended for the new 
operations.
 # The new operation must be implemented in GroupCoordinatorService (new 
coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
Scala) should just reject the request.

We could probably do 1) and 2) in one pull request and the remaining ones in 
another.

On the admin client side, this mainly requires the followings steps:
 * Define all the new java classes as defined in the KIP.
 * Add the new API to KafkaAdminClient class.

  was:The goal of this task is to implement the 


> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the ConsumerGroupDescribe API as 
> described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI];
>  and to implement the related changes in the admin client as described 
> [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups].
> On the server side, this mainly requires the following steps:
>  # The request/response schemas must be defined (see 
> ListGroupsRequest/Response.json for an example);
>  # Request/response classes must be defined (see 
> ListGroupsRequest/Response.java for an example);
>  # The API must be defined in KafkaApis (see 
> KafkaApis#handleDescribeGroupsRequest for an example);
>  # The GroupCoordinator interface (java file) must be extended for the new 
> operations.
>  # The new operation must be implemented in GroupCoordinatorService (new 
> coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in 
> Scala) should just reject the request.
> We could probably do 1) and 2) in one pull request and the remaining ones in 
> another.
> On the admin client side, this mainly requires the followings steps:
>  * Define all the new java classes as defined in the KIP.
>  * Add the new API to KafkaAdminClient class.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hudeqi commented on pull request #13958: MINOR:Fix modification exception introduced in KAFKA-15129;[1/N]

2023-07-06 Thread via GitHub


hudeqi commented on PR #13958:
URL: https://github.com/apache/kafka/pull/13958#issuecomment-1623755996

   > Could we please update the description/title to clearly state that this is 
fixing a regression introduced in [#13924 
(comment)](https://github.com/apache/kafka/pull/13924#discussion_r1254371528)?
   
   I have changed the title, if there is something inappropriate, you can 
change it freely, thank you! @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13958: MINOR:Refactor variable name for code specification

2023-07-06 Thread via GitHub


hudeqi commented on code in PR #13958:
URL: https://github.com/apache/kafka/pull/13958#discussion_r1254494044


##
core/src/main/scala/kafka/log/LogCleanerManager.scala:
##
@@ -549,7 +550,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
 
   def removeMetrics(): Unit = {
 GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric)
-GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {
+gaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => {

Review Comment:
   I am so sorry to introduce the problem. Thanks, @dajac @divijvaidya I have 
updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15157) Print startup time for RemoteIndexCache

2023-07-06 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15157:


 Summary: Print startup time for RemoteIndexCache
 Key: KAFKA-15157
 URL: https://issues.apache.org/jira/browse/KAFKA-15157
 Project: Kafka
  Issue Type: Sub-task
Reporter: Divij Vaidya


When RemoteIndexCache starts up, it will try to re-build the in-memory cache 
using the files already present on the disk in the remote-index-cache 
directory. The process involves:
1. deleting existing files which are pending delete i.e. have a .delete suffix
2. read the cached index files, if present.
3. creating the indexes (this step will create a MMapped'buffer)
4. perform sanity check on the indexes
5. add to internal cache

The steps 2-5 are bound by the maximum number of entries in the cache. But step 
1 could be arbitrary large.

To debug a slow cache startup, we should add a info statement that prints the 
time it took to initialize the cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-06 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14509:

Description: The goal of this task is to implement the 

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>
> The goal of this task is to implement the 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15154) Potential bug: We don't acquire lock when reading checkQuotas

2023-07-06 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-15154:


Assignee: Lan Ding

> Potential bug: We don't acquire lock when reading checkQuotas
> -
>
> Key: KAFKA-15154
> URL: https://issues.apache.org/jira/browse/KAFKA-15154
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Assignee: Lan Ding
>Priority: Major
>  Labels: newbie
>
> At sensor.java line:254, we call `this.metrics.values()`. metrics is not a 
> thread safe map and that is why we acquire a lock whenever we want to 
> add/remove entries from it. For example, see add(), hasMetrics() method.
> However, we don't acquire a lock when calling Sensor#checkQuotas(timeMs).
> This could lead to a situation where this metrics map may be left in an 
> inconsistent state (since it is not thread safe for concurrent read/write 
> access).
> The objective of this task is to validate what I said above is correct and if 
> yes, then fix the situation by enclosing this read in a lock. As a stretch 
> task, we should consider if we can replace the metrics data structure which 
> allows concurrent reads but exclusive writes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] DL1231 opened a new pull request, #13969: KAFKA-15154: Acquire lock when reading checkQuotas

2023-07-06 Thread via GitHub


DL1231 opened a new pull request, #13969:
URL: https://github.com/apache/kafka/pull/13969

   https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15154
   Acquire lock when reading checkQuotas.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-07-06 Thread via GitHub


mimaison commented on PR #13842:
URL: https://github.com/apache/kafka/pull/13842#issuecomment-1623715979

   Thanks @stevenbooke for the updates. The changes seem fine to me. @vvcephei 
do you have further comments?
   
   Also let's update https://github.com/apache/kafka-site/pull/521 as we'll 
need to merge it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14509) Add ConsumerGroupDescribe API

2023-07-06 Thread Max Riedel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Riedel reassigned KAFKA-14509:
--

Assignee: Max Riedel

> Add ConsumerGroupDescribe API
> -
>
> Key: KAFKA-14509
> URL: https://issues.apache.org/jira/browse/KAFKA-14509
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Max Riedel
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13967: MINOR: Update connector status metric description to include 'stopped' as a potential value

2023-07-06 Thread via GitHub


mimaison merged PR #13967:
URL: https://github.com/apache/kafka/pull/13967


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15154) Potential bug: We don't acquire lock when reading checkQuotas

2023-07-06 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740596#comment-17740596
 ] 

Lan Ding commented on KAFKA-15154:
--

I think you said is correct, "metrics.values()" will returns a Collection view 
of the values contained in the map, and the changes to the map are reflected in 
the collection. When reading and writing to the metrics map concurrently, it 
may lead to unpredictable results.

> Potential bug: We don't acquire lock when reading checkQuotas
> -
>
> Key: KAFKA-15154
> URL: https://issues.apache.org/jira/browse/KAFKA-15154
> Project: Kafka
>  Issue Type: Bug
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: newbie
>
> At sensor.java line:254, we call `this.metrics.values()`. metrics is not a 
> thread safe map and that is why we acquire a lock whenever we want to 
> add/remove entries from it. For example, see add(), hasMetrics() method.
> However, we don't acquire a lock when calling Sensor#checkQuotas(timeMs).
> This could lead to a situation where this metrics map may be left in an 
> inconsistent state (since it is not thread safe for concurrent read/write 
> access).
> The objective of this task is to validate what I said above is correct and if 
> yes, then fix the situation by enclosing this read in a lock. As a stretch 
> task, we should consider if we can replace the metrics data structure which 
> allows concurrent reads but exclusive writes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13960: KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown

2023-07-06 Thread via GitHub


divijvaidya commented on code in PR #13960:
URL: https://github.com/apache/kafka/pull/13960#discussion_r1254402700


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -537,6 +538,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def removeMetrics(): Unit = {
 KafkaController.MetricNames.foreach(metricsGroup.removeMetric)
+controllerContext.stats.removeMetrics()

Review Comment:
   we should be registering these metrics in initializeControllerContext and 
removing them in resetControllerContext. The responsibility of correctly 
resetting and initializing controllerContext is that of KafkaController.



##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -2788,6 +2796,20 @@ private[controller] class ControllerStats {
   def removeMetric(name: String): Unit = {
 metricsGroup.removeMetric(name)
   }
+
+  def removeMetrics(): Unit = {
+MeterMetricNames.foreach(metricsGroup.removeMetric(_))
+timerMetricNames.asScala.foreach(metricsGroup.removeMetric(_))

Review Comment:
   remove entry from timerMetricNames as well else it keeps growing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #13954: MINOR: Move MockTimer to server-common

2023-07-06 Thread via GitHub


dajac merged PR #13954:
URL: https://github.com/apache/kafka/pull/13954


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #13958: MINOR:Refactor variable name for code specification

2023-07-06 Thread via GitHub


dajac commented on PR #13958:
URL: https://github.com/apache/kafka/pull/13958#issuecomment-1623614568

   Could we please update the description to clearly state that this is fixing 
a regression introduced in 
https://github.com/apache/kafka/pull/13924#discussion_r1254371528?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >