[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask
vamossagar12 commented on code in PR #13955: URL: https://github.com/apache/kafka/pull/13955#discussion_r1255253840 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -651,6 +652,40 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { verifyTopicCreation(); } +@Test +public void testSendRecordsRetriableException() { +createWorkerTask(); + +SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + +expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); +expectTaskGetTopic(); + +when(transformationChain.apply(eq(record1))).thenReturn(null); +when(transformationChain.apply(eq(record2))).thenReturn(null); +when(transformationChain.apply(eq(record3))).thenReturn(record3); + +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); +TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + +when(producer.send(any(), any())).thenThrow(new RetriableException("Retriable exception")).thenReturn(null); + +workerTask.toSend = Arrays.asList(record1, record2, record3); + +// The producer throws a RetriableException the first time we try to send the third record Review Comment: nit: For better readability, just mention here that the first 2 records have been filtered out and only record3 would be sent. Would it be easier to follow for anyone who's new to the codebase. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask
vamossagar12 commented on code in PR #13955: URL: https://github.com/apache/kafka/pull/13955#discussion_r1255252249 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -651,6 +652,40 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { verifyTopicCreation(); } +@Test +public void testSendRecordsRetriableException() { +createWorkerTask(); + +SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + +expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); +expectTaskGetTopic(); + +when(transformationChain.apply(eq(record1))).thenReturn(null); +when(transformationChain.apply(eq(record2))).thenReturn(null); +when(transformationChain.apply(eq(record3))).thenReturn(record3); + +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); +TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + +when(producer.send(any(), any())).thenThrow(new RetriableException("Retriable exception")).thenReturn(null); + +workerTask.toSend = Arrays.asList(record1, record2, record3); + +// The producer throws a RetriableException the first time we try to send the third record +assertFalse(workerTask.sendRecords()); + +// The next attempt to send the third record should succeed +assertTrue(workerTask.sendRecords()); + +// Ensure that the first two records that were filtered out by the transformation chain +// aren't re-processed when we retry the call to sendRecords() +verify(transformationChain, times(4)).apply(any(SourceRecord.class)); Review Comment: Yeah I agree. An explicit check for `record1` and `record2` being transformed once and `record3` twice would be good. Regarding the duplicate re-transformations, I guess it should be achievable by storing a mapping between `SourceRecord` and the `ProducerRecord` ie pre-transformed and transformed. We can keep clearing the map as and when the entire batch of records are processed. In theory, that could have been another way of achieving what we are trying to achieve in this PR i.e if SMT outputs a record's transformation as null, we can update the mapping and we can skip it for subsequent retries. But the solution in this PR is cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1624726819 `MirrorConnectorsIntegrationBaseTest#testOffsetTranslationBehindReplicationFlow` is the only test which is relevant wrt the changes in this PR. I ran it locally a couple of times and it passed on both the occasions. Could it be a flaky test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14207) Add a 6.10 section for KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740864#comment-17740864 ] Luke Chen commented on KAFKA-14207: --- Thanks for the info, [~gharris1727] ! > Add a 6.10 section for KRaft > > > Key: KAFKA-14207 > URL: https://issues.apache.org/jira/browse/KAFKA-14207 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: documentation, kraft > Fix For: 3.3.0 > > > The section should talk about: > # Limitation > # Recommended deployment: external controller > # How to start a KRaft cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
showuon commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1255129114 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -16,6 +16,9 @@ */ package org.apache.kafka.storage.internals.log; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; Review Comment: Caffeine is added in this [PR](https://github.com/apache/kafka/pull/13850) to improve concurrent read performance. We chose it since it has good performance and is also adopted by other big projects, like Cassandra, HBase etc. About the GC issues, I don't think we have discussed about it. cc @divijvaidya , since you've done some research on Caffeine, do you have any comment to the GC issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
showuon commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1255120355 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); + +public static final String DIR_NAME = "remote-log-index-cache"; + +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps around. + * + * The requirements for this internal cache is as follows: + * 1. Multiple threads should be
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jeffkbkim commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1255022451 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ +class EventBasedCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(EventBasedCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +// The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask +// expires, the event is push to the queue of the coordinator runtime to be executed. This +// ensure that the threading model of the runtime is respected. Review Comment: nit: "the event is pushed", "This ensures that" ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java: ## @@ -16,12 +16,26 @@ */ package org.apache.kafka.coordinator.group.runtime; +import org.apache.kafka.common.KafkaException; + +import java.util.List; import java.util.concurrent.TimeUnit; /** * An interface to schedule and cancel operations. */ -public interface Timer { +public interface CoordinatorTimer { +/** + * Generates the records needed to implement this timeout write operation. In general, Review Comment: actual usages are in https://github.com/apache/kafka/pull/13870, if you search for `"timer.schedule"` under GroupMetadataManager (the old join protocol) one usage is a join operation which after some time expires and we try to complete the join phase. another usage is a heartbeat operation which when expires we remove the member from the group. these are mapped directly from the existing DelayedJoin/DelayedHeartbeat used by the purgatory. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ +class EventBasedCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */
[GitHub] [kafka] ijuma commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
ijuma commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1255034976 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -16,6 +16,9 @@ */ package org.apache.kafka.storage.internals.log; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; Review Comment: I think this was already there for the Scala code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1255017069 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -893,4 +1041,250 @@ public void testOnNewMetadataImage() { future1.complete(null); verify(coordinator1).onLoaded(newImage); } + +@Test +public void testScheduleTimer() throws InterruptedException { +MockTimer timer = new MockTimer(); +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(new MockPartitionWriter()) +.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Check initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.lastWrittenOffset); +assertEquals(0, ctx.lastCommittedOffset); + +// The coordinator timer should be empty. +assertEquals(0, ctx.timer.size()); + +// Timer #1. +ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, +() -> Arrays.asList("record1", "record2")); + +// Timer #2. +ctx.coordinator.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, +() -> Arrays.asList("record3", "record4")); + +// The coordinator timer should have two pending tasks. +assertEquals(2, ctx.timer.size()); + +// Advance time to fire timer #1, +timer.advanceClock(10 + 1); + +// Verify that the operation was executed. +assertEquals(mkSet("record1", "record2"), ctx.coordinator.records()); +assertEquals(1, ctx.timer.size()); + +// Advance time to fire timer #2, +timer.advanceClock(10 + 1); + +// Verify that the operation was executed. +assertEquals(mkSet("record1", "record2", "record3", "record4"), ctx.coordinator.records()); +assertEquals(0, ctx.timer.size()); +} + +@Test +public void testRescheduleTimer() throws InterruptedException { +MockTimer timer = new MockTimer(); +ManualEventProcessor processor = new ManualEventProcessor(); +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withLoader(new MockCoordinatorLoader()) +.withEventProcessor(processor) +.withPartitionWriter(new MockPartitionWriter()) +.withCoordinatorBuilderSupplier(new MockCoordinatorBuilderSupplier()) +.build(); + +// Loads the coordinator. +runtime.scheduleLoadOperation(TP, 10); + +// Poll twice to process the pending events related to the loading. +processor.poll(); +processor.poll(); + +// Check initial state. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(0, ctx.timer.size()); + +// The processor should be empty. +assertEquals(0, processor.size()); + +// Timer #1. +ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, +() -> Collections.singletonList("record1")); + +// The coordinator timer should have one pending task. +assertEquals(1, ctx.timer.size()); + +// Advance time to fire the pending timer. +timer.advanceClock(10 + 1); + +// An event should be waiting in the processor. +assertEquals(1, processor.size()); + +// Schedule a second timer with the same key. +ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, +() -> Collections.singletonList("record2")); + +// The coordinator timer should still have one pending task. +assertEquals(1, ctx.timer.size()); + +// Schedule a third timer with the same key. +ctx.coordinator.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, +() -> Collections.singletonList("record3")); + +// The coordinator timer should still have one pending task. +assertEquals(1, ctx.timer.size()); + +// Advance time to fire the pending timer. +timer.advanceClock(10 + 1); + +// Another event should be waiting in the processor. +assertEquals(2, processor.size()); + +// Poll twice to execute the two pending events. +assertTrue(processor.poll()); +assertTrue(processor.poll()); + +// Verify that the correct operation was executed. Only the third +// instance should have been
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator
jeffkbkim commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1254999681 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -874,4 +1087,1348 @@ public void replay( consumerGroup.updateMember(newMember); } } + +/** + * Replays GroupMetadataKey/Value to update the soft state of + * the generic group. + * + * @param key A GroupMetadataKey key. + * @param value A GroupMetadataValue record. + */ +public void replay( +GroupMetadataKey key, +GroupMetadataValue value, +short version +) { +String groupId = key.group(); + +if (value == null) { +// Tombstone. Group should not be added. +} else { +List loadedMembers = new ArrayList<>(); +for (GroupMetadataValue.MemberMetadata member : value.members()) { +int rebalanceTimeout = version == 0 ? member.sessionTimeout() : member.rebalanceTimeout(); + +JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestProtocolCollection(); +supportedProtocols.add(new JoinGroupRequestProtocol() +.setName(value.protocol()) +.setMetadata(member.subscription())); + +GenericGroupMember loadedMember = new GenericGroupMember( +member.memberId(), +Optional.ofNullable(member.groupInstanceId()), +member.clientId(), +member.clientHost(), +rebalanceTimeout, +member.sessionTimeout(), +value.protocolType(), +supportedProtocols, +member.assignment() +); + +loadedMembers.add(loadedMember); +} + +String protocolType = value.protocolType(); + +GenericGroup genericGroup = new GenericGroup( +this.logContext, +groupId, +loadedMembers.isEmpty() ? EMPTY : STABLE, +time, +value.generation(), +protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), +Optional.ofNullable(value.protocol()), +Optional.ofNullable(value.leader()), +value.currentStateTimestamp() == -1 ? Optional.empty() : Optional.of(value.currentStateTimestamp()) +); + +loadedMembers.forEach(member -> { +genericGroup.add(member, null); +log.info("Loaded member {} in group {} with generation {}.", +member.memberId(), groupId, genericGroup.generationId()); +}); + +genericGroup.setSubscribedTopics( +genericGroup.computeSubscribedTopics() +); +} +} + +/** + * Handle a JoinGroupRequest. + * + * @param context The request context. + * @param request The actual JoinGroup request. + * + * @return The result that contains records to append if the join group phase completes. + */ +public CoordinatorResult genericGroupJoin( +RequestContext context, +JoinGroupRequestData request, +CompletableFuture responseFuture +) { +CoordinatorResult result = EMPTY_RESULT; + +String groupId = request.groupId(); +String memberId = request.memberId(); +int sessionTimeoutMs = request.sessionTimeoutMs(); + +if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs || +sessionTimeoutMs > genericGroupMaxSessionTimeoutMs +) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()) +); +} else { +boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID); +// Group is created if it does not exist and the member id is UNKNOWN. if member +// is specified but group does not exist, request is rejected with GROUP_ID_NOT_FOUND +GenericGroup group; +try { +group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember); +} catch (Throwable t) { +responseFuture.complete(new JoinGroupResponseData() +.setMemberId(memberId) +.setErrorCode(Errors.forException(t).code()) +); +return EMPTY_RESULT; +} + +CoordinatorResult newGroupResult = EMPTY_RESULT; +if (group.isNew()) { Review Comment: i misunderstood, moved to using a local variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254998634 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ +class EventBasedCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(EventBasedCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +// The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask +// expires, the event is push to the queue of the coordinator runtime to be executed. This +// ensure that the threading model of the runtime is respected. +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the timeout operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +// If the write event fails, it is rescheduled with a small backoff except if the +// error is fatal. +event.future.exceptionally(ex -> { +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + Review Comment: I think the usage of delayed also confused me here. In theory if we never hit the timeout, we will repeatedly reschedule and hit this right? And it steady state, we don't necessarily want to write the event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254998634 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ +class EventBasedCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(EventBasedCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +// The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask +// expires, the event is push to the queue of the coordinator runtime to be executed. This +// ensure that the threading model of the runtime is respected. +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the timeout operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +// If the write event fails, it is rescheduled with a small backoff except if the +// error is fatal. +event.future.exceptionally(ex -> { +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + Review Comment: I think the usage of delayed also confused me here. In theory if we never hit the timeout, we will repeatedly reschedule and hit this right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254995855 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorTimer.java: ## @@ -16,12 +16,26 @@ */ package org.apache.kafka.coordinator.group.runtime; +import org.apache.kafka.common.KafkaException; + +import java.util.List; import java.util.concurrent.TimeUnit; /** * An interface to schedule and cancel operations. */ -public interface Timer { +public interface CoordinatorTimer { +/** + * Generates the records needed to implement this timeout write operation. In general, Review Comment: Could we maybe give an example of a timeout operation here? I had a little trouble conceptualizing this. 22/N gave examples -- ie we need to fence a group after a certain amount of time that really made me understand better. (I think reviewing the metadata refresh pr -- which had a different mechanism for stale data confused me a bit too, but I now understand why that can happen on the next heartbeat and why this should be done with a timer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254976555 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Thanks. I also misunderstood Timer and had to read its javadoc. > turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which are executed by the {@link CoordinatorEventProcessor} With respect to the above line, it is fair to understand this as a Timer that is used to track when an operation should occur (it usually occurs on a timeout) must be modified to handle the CoordinatorWriteEvents respects the threading model of the coordinator runtime. This modified Timer (CoordinatorTimer) is associated with the event via the TimerTask. Then this task will be added and will handle timeouts appropriately now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254986327 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ +class EventBasedCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(EventBasedCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +// The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask +// expires, the event is push to the queue of the coordinator runtime to be executed. This +// ensure that the threading model of the runtime is respected. +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the timeout operation. Review Comment: Maybe I'm still being pedantic here, but this is the operation that we will do that can time out right? Not the action we take if we time out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254986327 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,127 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * The EventBasedCoordinatorTimer implements the CoordinatorTimer interface and provides an event based + * timer which turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which + * are executed by the {@link CoordinatorEventProcessor} used by this coordinator runtime. This is done + * to ensure that the timer respects the threading model of the coordinator runtime. + * + * The {@link CoordinatorWriteEvent} events pushed by the coordinator timer wraps the + * {@link TimeoutOperation} operations scheduled by the coordinators. + * + * It also keeps track of all the scheduled {@link TimerTask}. This allows timeout operations to be + * cancelled or rescheduled. When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got pushed to the event processor. + * + * When a timer fails with an unexpected exception, the timer is rescheduled with a backoff. + */ +class EventBasedCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(EventBasedCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +// The TimerTask wraps the TimeoutOperation into a CoordinatorWriteEvent. When the TimerTask +// expires, the event is push to the queue of the coordinator runtime to be executed. This +// ensure that the threading model of the runtime is respected. +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the timeout operation. Review Comment: Maybe I'm still being pedantic here, but this is the operation that we will do that can time out right? Not the action we take if we time out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14426) Add documentation for Kraft limtations that have open KIPs
[ https://issues.apache.org/jira/browse/KAFKA-14426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14426: Description: Currently there are a number of limitations for Kraft, which are described as the motivation for the following open KIPs: * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] These limitations are: * No online method of resizing the controller quorum * No online method of recovering from controller disk loss * No support for heterogeneous voter lists in running controller nodes * When using a quorum size 3, there is no live-upgrade roll which is tolerant of a single unplanned machine failure. * When using a quorum size >3, there is a risk of zombie leaders causing extended outages without the pre-vote feature. These are significant enough concerns for operations of a Kraft-enabled cluster that they should be documented as official limitations in the ops documentation. Optionally, we may wish to provide or link to more detailed operations documentation about performing the offline-resize or offline-recovery stages, in addition to describing that such offline procedures are necessary. was: Currently there are a number of limitations for Kraft, which are described as the motivation for the following open KIPs: * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] These limitations are: * No online method of resizing the controller quorum * No online method of recovering from controller disk loss * No support for heterogeneous voter lists in running controller nodes * When using a quorum size 3, there is no live-upgrade roll which is tolerant of a single unplanned machine failure. * When using a quorum size >3, there is a risk of non-linearizable reads. These are significant enough concerns for operations of a Kraft-enabled cluster that they should be documented as official limitations in the ops documentation. Optionally, we may wish to provide or link to more detailed operations documentation about performing the offline-resize or offline-recovery stages, in addition to describing that such offline procedures are necessary. > Add documentation for Kraft limtations that have open KIPs > -- > > Key: KAFKA-14426 > URL: https://issues.apache.org/jira/browse/KAFKA-14426 > Project: Kafka > Issue Type: Task > Components: documentation, kraft >Reporter: Greg Harris >Priority: Major > > Currently there are a number of limitations for Kraft, which are described as > the motivation for the following open KIPs: > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes] > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery] > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] > > These limitations are: > * No online method of resizing the controller quorum > * No online method of recovering from controller disk loss > * No support for heterogeneous voter lists in running controller nodes > * When using a quorum size 3, there is no live-upgrade roll which is > tolerant of a single unplanned machine failure. > * When using a quorum size >3, there is a risk of zombie leaders causing > extended outages without the pre-vote feature. > These are significant enough concerns for operations of a Kraft-enabled > cluster that they should be documented as official limitations in the ops > documentation. > Optionally, we may wish to provide or link to more detailed operations > documentation about performing the offline-resize or offline-recovery stages, > in addition to describing that such offline procedures are necessary. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254981176 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Ah I see your other comment in the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14207) Add a 6.10 section for KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740818#comment-17740818 ] Greg Harris commented on KAFKA-14207: - [~showuon] The only accompanying information about that documentation is this github comment: [https://github.com/apache/kafka/pull/12642#discussion_r980338126] which links to [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] I mentioned this limitation in KAFKA-14426, but I did not find any other sources describing this limitation, or how pre-vote removes the limitation. I think that if KIP-650 is relevant, we should add to the `Motivation` section, indicating that this feature is required for larger quorums. > Add a 6.10 section for KRaft > > > Key: KAFKA-14207 > URL: https://issues.apache.org/jira/browse/KAFKA-14207 > Project: Kafka > Issue Type: Sub-task > Components: documentation >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > Labels: documentation, kraft > Fix For: 3.3.0 > > > The section should talk about: > # Limitation > # Recommended deployment: external controller > # How to start a KRaft cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254976555 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Thanks. I also misunderstood Timer and had to read its javadoc. > turns timeouts of a regular {@link Timer} into {@link CoordinatorWriteEvent} events which are executed by the {@link CoordinatorEventProcessor} With respect to the above line, it is fair to understand this as a Timer that is used to track when an operation should time out must be modified to handle the CoordinatorWriteEvents respects the threading model of the coordinator runtime. This modified Timer (CoordinatorTimer) is associated with the event via the TimerTask. Then this task will be added and will handle timeouts appropriately now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14426) Add documentation for Kraft limtations that have open KIPs
[ https://issues.apache.org/jira/browse/KAFKA-14426?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14426: Description: Currently there are a number of limitations for Kraft, which are described as the motivation for the following open KIPs: * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] These limitations are: * No online method of resizing the controller quorum * No online method of recovering from controller disk loss * No support for heterogeneous voter lists in running controller nodes * When using a quorum size 3, there is no live-upgrade roll which is tolerant of a single unplanned machine failure. * When using a quorum size >3, there is a risk of non-linearizable reads. These are significant enough concerns for operations of a Kraft-enabled cluster that they should be documented as official limitations in the ops documentation. Optionally, we may wish to provide or link to more detailed operations documentation about performing the offline-resize or offline-recovery stages, in addition to describing that such offline procedures are necessary. was: Currently there are a number of limitations for Kraft, which are described as the motivation for the following open KIPs: * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery] * [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] These limitations are: * No online method of resizing the controller quorum * No online method of recovering from controller disk loss * No support for heterogeneous voter lists in running controller nodes * When using a quorum size 3, there is no live-upgrade roll which is tolerant of a single unplanned machine failure. * When using a quorum size >3, there is a risk of non-linearizable reads. These are significant enough concerns for operations of a Kraft-enabled cluster that they should be documented as official limitations in the ops documentation. Optionally, we may wish to provide or link to more detailed operations documentation about performing the offline-resize or offline-recovery stages, in addition to describing that such offline procedures are necessary. > Add documentation for Kraft limtations that have open KIPs > -- > > Key: KAFKA-14426 > URL: https://issues.apache.org/jira/browse/KAFKA-14426 > Project: Kafka > Issue Type: Task > Components: documentation, kraft >Reporter: Greg Harris >Priority: Major > > Currently there are a number of limitations for Kraft, which are described as > the motivation for the following open KIPs: > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Controller+Membership+Changes] > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-856%3A+KRaft+Disk+Failure+Recovery] > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-650%3A+Enhance+Kafkaesque+Raft+semantics#KIP650:EnhanceKafkaesqueRaftsemantics-Pre-vote] > > These limitations are: > * No online method of resizing the controller quorum > * No online method of recovering from controller disk loss > * No support for heterogeneous voter lists in running controller nodes > * When using a quorum size 3, there is no live-upgrade roll which is > tolerant of a single unplanned machine failure. > * When using a quorum size >3, there is a risk of non-linearizable reads. > These are significant enough concerns for operations of a Kraft-enabled > cluster that they should be documented as official limitations in the ops > documentation. > Optionally, we may wish to provide or link to more detailed operations > documentation about performing the offline-resize or offline-recovery stages, > in addition to describing that such offline procedures are necessary. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] junrao commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
junrao commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1254942551 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -16,6 +16,9 @@ */ package org.apache.kafka.storage.internals.log; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; Review Comment: We introduced a new dependency caffeine here. Could you explain why caffeine is chosen and how stable is caffeine? The doc for caffeine mentions the use of weak references. A few years back, we avoided the usage of weak references in a PR because of the poor GC behavior. Have we done any experiments to understand the GC impact? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gaurav-narula commented on a diff in pull request #13949: KAFKA-15141: init logger statically on hot codepaths
gaurav-narula commented on code in PR #13949: URL: https://github.com/apache/kafka/pull/13949#discussion_r1254921844 ## core/src/main/scala/kafka/server/DelayedProduce.scala: ## @@ -58,6 +62,8 @@ class DelayedProduce(delayMs: Long, lockOpt: Option[Lock] = None) extends DelayedOperation(delayMs, lockOpt) { + override lazy val logger = DelayedProduce.logger Review Comment: @ijuma any thoughts on the above? Perhaps I'm overlooking something and it would help if you could elaborate with an example on scastie. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740748#comment-17740748 ] Edoardo Comar edited comment on KAFKA-15144 at 7/6/23 8:56 PM: --- I'll rather rewrite our test expecting the record to be found consuming no more than offset.lag.max=100 records from the target cluster, starting from the last checkpoint was (Author: ecomar): I'll rather rewrite our test expecting the record to be found consuming no more than offset.lag.max=100 records from the target cluster > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15148) Some integration tests are running as unit tests
[ https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740696#comment-17740696 ] Ezio Xie edited comment on KAFKA-15148 at 7/6/23 8:05 PM: -- I filtered integration tests with "integration" as key word and got these integration tests: {code:java} ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html {code} I added @Tag("integration") to "streams" module tests and @Category(IntegrationTest.class) to "connect" module tests, then these integration tests are excluded from "task unitTest". "stream" module's unitTest time reduced from 25 minutes to 17 minutes, "connect" module's unitTest time reduced from 18 minutes to 12 minutes. [~divijvaidya] Do you think this result is in line with your expectations? Or any advice? was (Author: JIRAUSER290463): I filtered integration tests by (find . -name "*.{*}integration{*}.html") and got these integration tests: {code:java} ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html {code} I added @Tag("integration") to "streams" module tests and @Category(IntegrationTest.class) to "connect" module tests, then these integration tests are excluded from "task unitTest". "stream" module's unitTest time reduced from 25 minutes to 17 minutes, "connect" module's unitTest time reduced from 18 minutes to 12 minutes. [~divijvaidya] Do you think this result is in line with your expectations? Or any advice? > Some integration tests are running as unit tests > > > Key: KAFKA-15148 > URL:
[GitHub] [kafka] eziosudo opened a new pull request, #13973: exclude integration tests in "./gradlew unitTest" by adding @Tag("integration") and @Category(IntegrationTest.class)
eziosudo opened a new pull request, #13973: URL: https://github.com/apache/kafka/pull/13973 Change these tests by adding @Tag("integration") and @Category(IntegrationTest.class). ` ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html ` "stream" module's unitTest time reduced from 25 minutes to 17 minutes, "connect" module's unitTest time reduced from 18 minutes to 12 minutes. https://github.com/apache/kafka/assets/54128896/dc0e0518-b675-4140-b140-91ea0bd992a5;> https://github.com/apache/kafka/assets/54128896/7d0afcce-47c9-45b8-b4be-a9715d3faeff;> ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [x] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15148) Some integration tests are running as unit tests
[ https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740696#comment-17740696 ] Ezio Xie edited comment on KAFKA-15148 at 7/6/23 7:56 PM: -- I filtered integration tests by (find . -name "*.{*}integration{*}.html") and got these integration tests: {code:java} ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html {code} I added @Tag("integration") to "streams" module tests and @Category(IntegrationTest.class) to "connect" module tests, then these integration tests are excluded from "task unitTest". "stream" module's unitTest time reduced from 25 minutes to 17 minutes, "connect" module's unitTest time reduced from 18 minutes to 12 minutes. [~divijvaidya] Do you think this result is in line with your expectations? Or any advice? was (Author: JIRAUSER290463): I filtered integration tests by (find . -name "*.*integration*.html") and got these integration tests: {code:java} ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html {code} I added @Tag("integration") to "streams" module tests and @Category(IntegrationTest.class) to "connect" module tests, then these integration tests are excluded from "task unitTest". "stream" module's unitTest time reduced from 25 minutes to 16 minutes, "connect" module's unitTest time reduced from 18 minutes to 12 minutes. [~divijvaidya] Do you think this result is in line with your expectations? Or any advice? > Some integration tests are running as unit tests > > > Key: KAFKA-15148 > URL:
[GitHub] [kafka] aneelkkhatri opened a new pull request, #13972: Fix one typo in javadoc
aneelkkhatri opened a new pull request, #13972: URL: https://github.com/apache/kafka/pull/13972 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joobisb commented on pull request #13862: KAFKA-15050: format the prompts in the quickstart
joobisb commented on PR #13862: URL: https://github.com/apache/kafka/pull/13862#issuecomment-1624188820 @tombentley could you please have a look, have addressed the comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254819460 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Updated the javadoc. Let me know if it makes things clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer
philipnee commented on PR #13914: URL: https://github.com/apache/kafka/pull/13914#issuecomment-1624179241 @erikvanoosten - I'll try to help you out on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254807923 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: The confusion may come from [here](https://github.com/apache/kafka/pull/13961/files#diff-4b9dd21d7748bfb375634ef6e39037e99f53ed2546519bec32fa1707bd262fe8R271). CoordinatorResult is the part which could accept a null response now. In this particular case, it will be null because we have no response, only records. The null response will be eventually used to complete the event.future. As we use event.future.exceptionally, the null response is never used here either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740751#comment-17740751 ] ASF GitHub Bot commented on KAFKA-14995: stevenbooke commented on PR #521: URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1624159774 @mimaison Could you review this and merge if there is no issues please? > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740748#comment-17740748 ] Edoardo Comar commented on KAFKA-15144: --- I'll rather rewrite our test expecting the record to be found consuming no more than offset.lag.max=100 records from the target cluster > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] erikvanoosten commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer
erikvanoosten commented on PR #13914: URL: https://github.com/apache/kafka/pull/13914#issuecomment-1624147284 > Have you gotten much feedback from the discussion thread yet? Not a single comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254792945 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: Right, so maybe I need to figure out where this future code is looked at. Because it seems we put this code with the future that could be null in the queue. I guess I'm just struggling to understand why something that was required to be non-null before can just be changed to accept nulls and where that code is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254790531 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Ok -- that very much confused me so maybe we can add comments about how this. I will need to reread a lot of this ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Ok -- that very much confused me so maybe we can add comments about how this works. I will need to reread a lot of this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254782962 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: I was referring to L274. event.future is the future which is completed when (or after) the event is executed (and committed). When you call event.future.exceptionally, it registers a callback/function that is executed when the future is completed with an exception. Exceptionally returns a new future that is completed with the result of this callback. The returned future is not used here so the null we discussed is not going anywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254778449 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: No. The timer schedules an event when the timer expires. The event executes the timeout operation. If the timeout operation raises an error, the error is used to complete the future of the event. This is how the write event works. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254741731 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Hmmm --- maybe I'm very confused now. Is the timeout actually a timeout -- ie if we haven't done the event by a given time then we return with error? I think I missed that and though we were waiting for the timer to hit a time to execute the event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254740479 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: Hmm -- can you point me to the code that "reacts to its result" and how we don't use the resulting future. (Maybe I'm just confused) but it looks like we always enqueue the event regardless of the future and I don't see where we check if its null and not use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15069) Refactor scanning hierarchy out of DelegatingClassLoader
[ https://issues.apache.org/jira/browse/KAFKA-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15069. - Reviewer: Chris Egerton Resolution: Fixed > Refactor scanning hierarchy out of DelegatingClassLoader > > > Key: KAFKA-15069 > URL: https://issues.apache.org/jira/browse/KAFKA-15069 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.6.0 > > > The DelegatingClassLoader is involved in both scanning and using the results > of scanning to process classloading. > Instead, the scanning should take place outside of the DelegatingClassLoader, > and results of scanning be passed back into the DelegatingClassLoader for > classloading functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13275: KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module.
satishd commented on code in PR #13275: URL: https://github.com/apache/kafka/pull/13275#discussion_r1254737901 ## storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java: ## @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; +import org.apache.kafka.server.util.ShutdownableThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.apache.kafka.storage.internals.log.LogFileUtils.INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TIME_INDEX_FILE_SUFFIX; +import static org.apache.kafka.storage.internals.log.LogFileUtils.TXN_INDEX_FILE_SUFFIX; + +/** + * This is a LFU (Least Frequently Used) cache of remote index files stored in `$logdir/remote-log-index-cache`. + * This is helpful to avoid re-fetching the index files like offset, time indexes from the remote storage for every + * fetch call. The cache is re-initialized from the index files on disk on startup, if the index files are available. + * + * The cache contains a garbage collection thread which will delete the files for entries that have been removed from + * the cache. + * + * Note that closing this cache does not delete the index files on disk. + * Note that the cache eviction policy is based on the default implementation of Caffeine i.e. + * https://github.com/ben-manes/caffeine/wiki/Efficiency;>Window TinyLfu. TinyLfu relies on a frequency + * sketch to probabilistically estimate the historic usage of an entry. + * + */ +public class RemoteIndexCache implements Closeable { + +private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); + +public static final String DIR_NAME = "remote-log-index-cache"; + +private static final String TMP_FILE_SUFFIX = ".tmp"; + +public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; + +/** + * Directory where the index files will be stored on disk. + */ +private final File cacheDir; + +/** + * Represents if the cache is closed or not. Closing the cache is an irreversible operation. + */ +private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false); + +/** + * Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected. + */ +private final LinkedBlockingQueue expiredIndexes = new LinkedBlockingQueue<>(); + +/** + * Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other + * concurrent reads in-progress. + */ +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +/** + * Actual cache implementation that this file wraps around. + * + * The requirements for this internal cache is as follows: + * 1. Multiple threads should be
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740736#comment-17740736 ] ASF GitHub Bot commented on KAFKA-14995: stevenbooke commented on PR #521: URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1624069492 Ok, I will make the change now. > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15069) Refactor scanning hierarchy out of DelegatingClassLoader
[ https://issues.apache.org/jira/browse/KAFKA-15069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15069: Fix Version/s: 3.6.0 > Refactor scanning hierarchy out of DelegatingClassLoader > > > Key: KAFKA-15069 > URL: https://issues.apache.org/jira/browse/KAFKA-15069 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.6.0 > > > The DelegatingClassLoader is involved in both scanning and using the results > of scanning to process classloading. > Instead, the scanning should take place outside of the DelegatingClassLoader, > and results of scanning be passed back into the DelegatingClassLoader for > classloading functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254735232 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: Scheduling a timeout is only done from within the execution of an event. For the usages, you can take a look at the next PR: https://github.com/apache/kafka/pull/13963. In short, we need timers to implement session and revocation timeouts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 opened a new pull request, #13971: URL: https://github.com/apache/kafka/pull/13971 Add the ServiceLoaderScanner, a companion to the ReflectionScanner which does not use reflection to discover plugins. This will be utilized later in the Plugins class for the configurable-discovery-mode startup scanning, and in the connect-plugin-path script. In this PR, the PluginScannerTest is parameterized to ensure that the ServiceLoaderScanner has parity on the TestPlugins. This required adding manifests for the valid TestPlugins. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
dajac commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254733078 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: Are you referring to the nulls returns in exceptionally above? If yes, those are not used anywhere because we don’t use the resulting future. So we enqueue the event and we react it its result. If it failed, we retry it under some conditions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hgeraldino commented on pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
hgeraldino commented on PR #13383: URL: https://github.com/apache/kafka/pull/13383#issuecomment-1624048937 Committed the suggestions, thanks again for reviewing @C0urante! BTW, I also opened https://github.com/apache/kafka/pull/13951 to have `WorkerSinkTaskThreadedTest` migrated. There's only one more in my bucket (WorkerSinkTaskTest) which has proven to be quite challenging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15144) MM2 Checkpoint downstreamOffset stuck to 1
[ https://issues.apache.org/jira/browse/KAFKA-15144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740732#comment-17740732 ] Greg Harris commented on KAFKA-15144: - Yes [~ecomar] I think the documentation about the checkpoints topic is lacking, I see that the MM2 KIP just gives the schema and the RemoteClusterUtils/MirrorClient implementations and no further details. In some discussions recently some people have questioned if anyone is reading the checkpoints topic directly; it's good to know that the contents of the topic is relevant for you. > in the past one of our tests was written expecting to find the exact match > for upstream/downsteram because that was the externally visible behavior when > all wasd good. Unfortunately this is only true in rather specific circumstances, before or after the recent changes. If MirrorSource were to uncleanly restart, or if the upstream topic had transaction markers, then the offsets of the source and target records will not correspond directly. For testing the translation, I found it helpful to set `offset.lag.max=0`, and only rely on offset translation at the _end_ of the topic between batches. I hope fixing your tests goes well. > MM2 Checkpoint downstreamOffset stuck to 1 > -- > > Key: KAFKA-15144 > URL: https://issues.apache.org/jira/browse/KAFKA-15144 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Attachments: edo-connect-mirror-maker-sourcetarget.properties > > > Steps to reproduce : > 1.Start the source cluster > 2.Start the target cluster > 3.Start connect-mirror-maker.sh using a config like the attached > 4.Create a topic in source cluster > 5.produce a few messages > 6.consume them all with autocommit enabled > > 7. then dump the Checkpoint topic content e.g. > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > source.checkpoints.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.CheckpointFormatter}} > {{{}Checkpoint{consumerGroupId=edogroup, > topicPartition=source.vf-mirroring-test-edo-0, upstreamOffset=3, > {*}downstreamOffset=1{*}, metadata={ > > the downstreamOffset remains at 1, while, in a fresh cluster pair like with > the source topic created while MM2 is running, > I'd expect the downstreamOffset to match the upstreamOffset. > Note that dumping the offset sync topic, shows matching initial offsets > {{% bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 --topic > mm2-offset-syncs.source.internal --from-beginning --formatter > org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter}} > {{{}OffsetSync{topicPartition=vf-mirroring-test-edo-0, upstreamOffset=0, > downstreamOffset=0{ > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner
C0urante merged PR #13821: URL: https://github.com/apache/kafka/pull/13821 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254713760 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: I guess I also realize I don't fully understand why we need to have a timer for these events. But maybe I need to re-read the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254711785 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); Review Comment: I guess I wasn't sure about adding to the map and running the event. We may be only doing one operation that adds to the map at a time, but does that also mean we will only hit the time to do the event during this time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15155) Follow PEP 8 best practice in Python to check if a container is empty
[ https://issues.apache.org/jira/browse/KAFKA-15155?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740713#comment-17740713 ] Yi-Sheng Lien commented on KAFKA-15155: --- Hi [~divijvaidya] , I'm interested in the Kafka project and reading the [HOW TO CONTRIBUTE.|https://kafka.apache.org/contributing] May I take this issue as my first issue of Kafka if you don't mind? > Follow PEP 8 best practice in Python to check if a container is empty > - > > Key: KAFKA-15155 > URL: https://issues.apache.org/jira/browse/KAFKA-15155 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Trivial > Labels: newbie > > *This is a good task for first time contributors to Kafka* > At release.py Line:94 and at Line:60, we don't follow PEP 8 [1] best > practices. > To check if a container or sequence (string, list, tuple) is empty, use if > not val. Do not compare its length using if len(val) == 0 > [1] > [https://peps.python.org/pep-0008/#programming-recommendations#:~:text=if%20not%20seq] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254698316 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: But I suppose you are saying that null future = null response. I guess I wasn't sure how we write records but have a null response and what that means. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13961: KAFKA-14462; [21/N] Add CoordinatorTimer implementation in CoordinatorRuntime
jolshan commented on code in PR #13961: URL: https://github.com/apache/kafka/pull/13961#discussion_r1254697080 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -190,6 +216,118 @@ boolean canTransitionFrom(CoordinatorState state) { abstract boolean canTransitionFrom(CoordinatorState state); } +/** + * Implements the CoordinatorTimer interface. This class keeps track of all the + * schedule timers for a coordinator/partition. + * + * When a timer is cancelled or overridden, the previous timer is guaranteed to + * not be executed even if it already expired and got push to the event processor. + * + * When a timer fails with an unexpected exception, the timer is reschedule with + * a backoff. + */ +class InternalCoordinatorTimer implements CoordinatorTimer { +/** + * The logger. + */ +final Logger log; + +/** + * The topic partition. + */ +final TopicPartition tp; + +/** + * The scheduled timers keyed by their key. + */ +final Map tasks = new HashMap<>(); + +InternalCoordinatorTimer(TopicPartition tp, LogContext logContext) { +this.tp = tp; +this.log = logContext.logger(InternalCoordinatorTimer.class); +} + +@Override +public void schedule( +String key, +long delay, +TimeUnit unit, +TimeoutOperation operation +) { +TimerTask task = new TimerTask(unit.toMillis(delay)) { +@Override +public void run() { +String eventName = "Timeout(tp=" + tp + ", key=" + key + ")"; +CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(eventName, tp, coordinator -> { +log.debug("Executing write event {} for timer {}.", eventName, key); + +// If the task is different, it means that the timer has been +// cancelled while the event was waiting to be processed. +if (!tasks.remove(key, this)) { +throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled"); +} + +// Execute the real operation. +return new CoordinatorResult<>(operation.generateRecords(), null); +}); + +event.future.exceptionally(ex -> { +// Otherwise, we handle the error if the timer is still active. +if (ex instanceof RejectedExecutionException) { +log.debug("The delayed write event {} for the timer {} was not executed because it was " + +"cancelled or overridden.", event.name, key); +return null; +} + +if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) { +log.debug("The delayed write event {} for the timer {} failed due to {}. Ignoring it because " + +"the coordinator is not active.", event.name, key, ex.getMessage()); +return null; +} + +log.info("The delayed write event {} for the timer {} failed due to {}. Rescheduling it. ", +event.name, key, ex.getMessage(), ex); +schedule(key, 500, TimeUnit.MILLISECONDS, operation); + +return null; +}); + +log.debug("Scheduling write event {} for timer {}.", event.name, key); +try { +enqueue(event); Review Comment: In the code above this event.future.exceptionally we return null for most code paths. I was getting confused on how we can return null for that future but still enqueue the event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15148) Some integration tests are running as unit tests
[ https://issues.apache.org/jira/browse/KAFKA-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740696#comment-17740696 ] Ezio Xie commented on KAFKA-15148: -- I filtered integration tests by (find . -name "*.*integration*.html") and got these integration tests: {code:java} ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsReporterIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.LagFetchIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.OptimizedKTableIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.RegexSourceIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.MetricsIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.QueryableStateIntegrationTest.html ./streams/build/reports/tests/unitTest/classes/org.apache.kafka.streams.integration.KTableEfficientRangeQueryTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopCounterTest.html ./connect/runtime/build/reports/tests/unitTest/classes/org.apache.kafka.connect.integration.StartAndStopLatchTest.html {code} I added @Tag("integration") to "streams" module tests and @Category(IntegrationTest.class) to "connect" module tests, then these integration tests are excluded from "task unitTest". "stream" module's unitTest time reduced from 25 minutes to 16 minutes, "connect" module's unitTest time reduced from 18 minutes to 12 minutes. [~divijvaidya] Do you think this result is in line with your expectations? Or any advice? > Some integration tests are running as unit tests > > > Key: KAFKA-15148 > URL: https://issues.apache.org/jira/browse/KAFKA-15148 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Divij Vaidya >Assignee: Ezio Xie >Priority: Minor > Labels: newbie > > *This is a good item for a newcomer into Kafka code base to pick up* > > When we run `./gradlew unitTest`, it is supposed to run all unit tests. > However, we are running some integration tests as part of which makes the > overall process of running unitTest take longer than expected. > Example of such tests: > > :streams:unitTest > Executing test > > org.apache...integration.NamedTopologyIntegrationTest > > :streams:unitTest > Executing test > > org.apache...integration.QueryableStateIntegrationTest > After this task, we should not run the these tests as part of `./gradlew > unitTest`, instead they should be run as part of `./gradlew integrationTest`. > As part of acceptance criteria, please add the snapshot of html summary > generated to verify that these tests are indeed running as part of > integrationTest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest
C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1254675710 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -739,62 +693,42 @@ public void testSlowTaskStart() throws Exception { createWorkerTask(); -offsetStore.start(); -EasyMock.expectLastCall(); -sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); -EasyMock.expectLastCall(); -sourceTask.start(TASK_PROPS); -EasyMock.expectLastCall().andAnswer(() -> { +doAnswer((Answer) invocation -> { startupLatch.countDown(); -assertTrue(awaitLatch(finishStartupLatch)); +ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting for task to finish"); Review Comment: ```suggestion ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting for main test thread to allow task startup to complete"); ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ## @@ -315,157 +319,130 @@ public void testPause() throws Exception { taskFuture.get(); -PowerMock.verifyAll(); +verifyCleanStartup(); +verifyTaskGetTopic(count.get()); +verifyOffsetFlush(true); +verifyTopicCreation(TOPIC); +verify(statusListener).onPause(taskId); +verify(statusListener).onShutdown(taskId); +verify(sourceTask).stop(); +verify(offsetWriter).offset(PARTITION, OFFSET); +verifyClose(); } @Test public void testPollsInBackground() throws Exception { createWorkerTask(); -expectCleanStartup(); - final CountDownLatch pollLatch = expectPolls(10); -// In this test, we don't flush, so nothing goes any further than the offset writer expectTopicCreation(TOPIC); - -sourceTask.stop(); -EasyMock.expectLastCall(); - -offsetWriter.offset(PARTITION, OFFSET); -PowerMock.expectLastCall(); -expectOffsetFlush(true); - -statusListener.onShutdown(taskId); -EasyMock.expectLastCall(); - -expectClose(); - -PowerMock.replayAll(); +expectOffsetFlush(); workerTask.initialize(TASK_CONFIG); Future taskFuture = executor.submit(workerTask); -assertTrue(awaitLatch(pollLatch)); +ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG); workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); taskFuture.get(); assertPollMetrics(10); - -PowerMock.verifyAll(); +verifyCleanStartup(); +verifyOffsetFlush(true); +verify(offsetWriter).offset(PARTITION, OFFSET); +verify(statusListener).onShutdown(taskId); +verifyClose(); } @Test public void testFailureInPoll() throws Exception { createWorkerTask(); -expectCleanStartup(); - final CountDownLatch pollLatch = new CountDownLatch(1); final RuntimeException exception = new RuntimeException(); -EasyMock.expect(sourceTask.poll()).andAnswer(() -> { +when(sourceTask.poll()).thenAnswer(invocation -> { pollLatch.countDown(); throw exception; }); -statusListener.onFailure(taskId, exception); -EasyMock.expectLastCall(); - -sourceTask.stop(); -EasyMock.expectLastCall(); expectEmptyOffsetFlush(); -expectClose(); - -PowerMock.replayAll(); - workerTask.initialize(TASK_CONFIG); Future taskFuture = executor.submit(workerTask); -assertTrue(awaitLatch(pollLatch)); +ConcurrencyUtils.awaitLatch(pollLatch, POLL_TIMEOUT_MSG); //Failure in poll should trigger automatic stop of the task assertTrue(workerTask.awaitStop(1000)); -assertShouldSkipCommit(); taskFuture.get(); assertPollMetrics(0); -PowerMock.verifyAll(); +verifyCleanStartup(); +verify(statusListener).onFailure(taskId, exception); +verify(sourceTask).stop(); +assertShouldSkipCommit(); +verifyOffsetFlush(true); +verifyClose(); } @Test public void testFailureInPollAfterCancel() throws Exception { createWorkerTask(); -expectCleanStartup(); - final CountDownLatch pollLatch = new CountDownLatch(1); final CountDownLatch workerCancelLatch = new CountDownLatch(1); final RuntimeException exception = new RuntimeException(); -EasyMock.expect(sourceTask.poll()).andAnswer(() -> { +when(sourceTask.poll()).thenAnswer(invocation -> { pollLatch.countDown(); -assertTrue(awaitLatch(workerCancelLatch)); +ConcurrencyUtils.awaitLatch(workerCancelLatch, "Timeout waiting for task
[GitHub] [kafka] C0urante commented on a diff in pull request #13955: KAFKA-15145: Don't re-process records filtered out by SMTs on Kafka client retriable exceptions in AbstractWorkerSourceTask
C0urante commented on code in PR #13955: URL: https://github.com/apache/kafka/pull/13955#discussion_r1254656021 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -651,6 +652,40 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { verifyTopicCreation(); } +@Test +public void testSendRecordsRetriableException() { +createWorkerTask(); + +SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); +SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); + +expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC); +expectTaskGetTopic(); + +when(transformationChain.apply(eq(record1))).thenReturn(null); +when(transformationChain.apply(eq(record2))).thenReturn(null); +when(transformationChain.apply(eq(record3))).thenReturn(record3); + +TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList()); +TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.singletonMap(TOPIC, topicDesc)); + +when(producer.send(any(), any())).thenThrow(new RetriableException("Retriable exception")).thenReturn(null); + +workerTask.toSend = Arrays.asList(record1, record2, record3); + +// The producer throws a RetriableException the first time we try to send the third record +assertFalse(workerTask.sendRecords()); + +// The next attempt to send the third record should succeed +assertTrue(workerTask.sendRecords()); + +// Ensure that the first two records that were filtered out by the transformation chain +// aren't re-processed when we retry the call to sendRecords() +verify(transformationChain, times(4)).apply(any(SourceRecord.class)); Review Comment: Do you think it might be worth it to be more explicit in the verifications here? I.e., verify that `record1` and `record2` were transformed only once, but `record3` was transformed twice? Also, it's a little strange that we're re-transforming a record for each retry. If it's not too much we may want to refine that logic to only transform each record at most once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740686#comment-17740686 ] ASF GitHub Bot commented on KAFKA-14995: mimaison commented on PR #521: URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1623947447 The latest committer is Divij Vaidya, see at the bottom of https://kafka.apache.org/committers. His Github username is `divijvaidya`. > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740683#comment-17740683 ] ASF GitHub Bot commented on KAFKA-14995: stevenbooke commented on PR #521: URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1623938985 Who is the new Committer? What is their Github username? > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: Steven Booke >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on a diff in pull request #13959: KAFKA-15129;[5/N] Remove metrics in ControllerChannelManager when broker shutdown
hudeqi commented on code in PR #13959: URL: https://github.com/apache/kafka/pull/13959#discussion_r1254636348 ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -537,6 +537,7 @@ class KafkaController(val config: KafkaConfig, private def removeMetrics(): Unit = { KafkaController.MetricNames.foreach(metricsGroup.removeMetric) +controllerChannelManager.removeMetrics() Review Comment: Since the current ControllerChannelManager only removes the tagged metrics (that is, gaugeMetricNameWithTag and timerMetricNameWithTag) of all brokers in brokerStateInfo during shutdown, but omits TotalQueueSizeMetricName, I added the removeMetrics method to remove it during broker shutdown. Do you mean to combine the two into the shutdown method of ControllerChannelManager? @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13797: KAFKA-14950: implement assign() and assignment()
philipnee commented on PR #13797: URL: https://github.com/apache/kafka/pull/13797#issuecomment-1623905234 @kirktrue thanks for taking over this. @junrao - would you be up for another round of review? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
mimaison commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1623903798 Also we need to update the `LICENSE-binary` file as mentioned in https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L73-L74 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer
philipnee commented on PR #13914: URL: https://github.com/apache/kafka/pull/13914#issuecomment-1623900690 Hey from the first look, I think I think this is a reasonable suggestion. Have you gotten much feedback from the discussion thread yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13960: KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown
hudeqi commented on code in PR #13960: URL: https://github.com/apache/kafka/pull/13960#discussion_r1254619269 ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -537,6 +538,7 @@ class KafkaController(val config: KafkaConfig, private def removeMetrics(): Unit = { KafkaController.MetricNames.foreach(metricsGroup.removeMetric) +controllerContext.stats.removeMetrics() Review Comment: My thought is: this PR is to remove the ControllerStats metric when the broker shutdown. Before that, every broker in the cluster has registered all the metrics of ControllerStats. If following your suggestion, then only the real controller broker will register the metrics of ControllerStats, while other brokers do not. I don’t know this Is a logical change of a piece necessary? @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15159) Update minor dependencies in preparation for 3.5.1
[ https://issues.apache.org/jira/browse/KAFKA-15159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15159: - Fix Version/s: 3.5.1 > Update minor dependencies in preparation for 3.5.1 > -- > > Key: KAFKA-15159 > URL: https://issues.apache.org/jira/browse/KAFKA-15159 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Major > Fix For: 3.5.1 > > > Go through the list of dependencies in dependencies.gradle. Check if newer > minor versions (backward compatible) are available, is yes, then create a PR > for them. Especially important are the ones which fix a CVE. > We will merge them into trunk and backport it to 3.5 branch. Similar to > [https://github.com/apache/kafka/pull/13673] here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15159) Update minor dependencies in preparation for 3.5.1
[ https://issues.apache.org/jira/browse/KAFKA-15159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15159: - Description: Go through the list of dependencies in dependencies.gradle. Check if newer minor versions (backward compatible) are available, is yes, then create a PR for them. Especially important are the ones which fix a CVE. We will merge them into trunk and backport it to 3.5 branch. Similar to [https://github.com/apache/kafka/pull/13673] here. was: Go through the list of dependencies in dependencies.gradle. Check if newer minor versions (backward compatible) are available, is yes, then create a PR for them. Especially important are the ones which fix a CVE. We will merge them into trunk and backport it to 3.5 branch. > Update minor dependencies in preparation for 3.5.1 > -- > > Key: KAFKA-15159 > URL: https://issues.apache.org/jira/browse/KAFKA-15159 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Priority: Major > > Go through the list of dependencies in dependencies.gradle. Check if newer > minor versions (backward compatible) are available, is yes, then create a PR > for them. Especially important are the ones which fix a CVE. > We will merge them into trunk and backport it to 3.5 branch. Similar to > [https://github.com/apache/kafka/pull/13673] here. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15159) Update minor dependencies in preparation for 3.5.1
Divij Vaidya created KAFKA-15159: Summary: Update minor dependencies in preparation for 3.5.1 Key: KAFKA-15159 URL: https://issues.apache.org/jira/browse/KAFKA-15159 Project: Kafka Issue Type: Improvement Reporter: Divij Vaidya Go through the list of dependencies in dependencies.gradle. Check if newer minor versions (backward compatible) are available, is yes, then create a PR for them. Especially important are the ones which fix a CVE. We will merge them into trunk and backport it to 3.5 branch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15102) Mirror Maker 2 - KIP690 backward compatibility
[ https://issues.apache.org/jira/browse/KAFKA-15102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740663#comment-17740663 ] Chris Egerton commented on KAFKA-15102: --- [~omnia_h_ibrahim] I've assigned you this ticket so nobody else picks it up by mistake; feel free to unassign if you no longer want to work on this. > Mirror Maker 2 - KIP690 backward compatibility > -- > > Key: KAFKA-15102 > URL: https://issues.apache.org/jira/browse/KAFKA-15102 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.1.0 >Reporter: David Dufour >Priority: Major > > According to KIP690, "When users upgrade an existing MM2 cluster they don’t > need to change any of their current configuration as this proposal maintains > the default behaviour for MM2." > Now, the separator is subject to customization. > As a consequence, when an MM2 upgrade is performed, if the separator was > customized with replication.policy.separator, the name of this internal topic > changes. It then generates issues like: > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidTopicException: Topic > 'mm2-offset-syncs_bkts28_internal' collides with existing topics: > mm2-offset-syncs.bkts28.internal > It has been observed that the replication can then be broken sometimes > several days after the upgrade (reason not identified). By deleting the old > topic name, it recovers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15102) Mirror Maker 2 - KIP690 backward compatibility
[ https://issues.apache.org/jira/browse/KAFKA-15102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-15102: - Assignee: Omnia Ibrahim > Mirror Maker 2 - KIP690 backward compatibility > -- > > Key: KAFKA-15102 > URL: https://issues.apache.org/jira/browse/KAFKA-15102 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.1.0 >Reporter: David Dufour >Assignee: Omnia Ibrahim >Priority: Major > > According to KIP690, "When users upgrade an existing MM2 cluster they don’t > need to change any of their current configuration as this proposal maintains > the default behaviour for MM2." > Now, the separator is subject to customization. > As a consequence, when an MM2 upgrade is performed, if the separator was > customized with replication.policy.separator, the name of this internal topic > changes. It then generates issues like: > Caused by: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidTopicException: Topic > 'mm2-offset-syncs_bkts28_internal' collides with existing topics: > mm2-offset-syncs.bkts28.internal > It has been observed that the replication can then be broken sometimes > several days after the upgrade (reason not identified). By deleting the old > topic name, it recovers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15157) Print startup time for RemoteIndexCache
[ https://issues.apache.org/jira/browse/KAFKA-15157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-15157: Assignee: Lan Ding > Print startup time for RemoteIndexCache > --- > > Key: KAFKA-15157 > URL: https://issues.apache.org/jira/browse/KAFKA-15157 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Major > > When RemoteIndexCache starts up, it will try to re-build the in-memory cache > using the files already present on the disk in the remote-index-cache > directory. The process involves: > 1. deleting existing files which are pending delete i.e. have a .delete suffix > 2. read the cached index files, if present. > 3. creating the indexes (this step will create a MMapped'buffer) > 4. perform sanity check on the indexes > 5. add to internal cache > The steps 2-5 are bound by the maximum number of entries in the cache. But > step 1 could be arbitrary large. > To debug a slow cache startup, we should add a info statement that prints the > time it took to initialize the cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] DL1231 opened a new pull request, #13970: KAFKA-15157: Print startup time for RemoteIndexCache
DL1231 opened a new pull request, #13970: URL: https://github.com/apache/kafka/pull/13970 https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15157 Print startup time for RemoteIndexCache. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13929: KAFKA-15129;[3/N] Remove metrics in AbstractFetcherManager when fetcher manager instance shutdown
hudeqi commented on code in PR #13929: URL: https://github.com/apache/kafka/pull/13929#discussion_r1254581656 ## core/src/main/scala/kafka/server/AbstractFetcherManager.scala: ## @@ -226,6 +240,29 @@ abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: Stri fetcherThreadMap.clear() } } + + def shutdown(): Unit = { +info("shutting down") +try { + closeAllFetchers() +} finally { + removeMetrics() +} +info("shutdown completed") + } + + private[server] def removeMetrics(): Unit = { +metricNamesToTags.foreach(metricTags => { Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15153) Use Python `is` instead of `==` to compare for None
[ https://issues.apache.org/jira/browse/KAFKA-15153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15153: - Fix Version/s: 3.6.0 > Use Python `is` instead of `==` to compare for None > > > Key: KAFKA-15153 > URL: https://issues.apache.org/jira/browse/KAFKA-15153 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Minor > Labels: newbie > Fix For: 3.6.0 > > > *This is a good item to be picked by first time contributors to Kafka code > base* > At release_notes.py Line: 47 > The {{==}} and {{!=}} operators use the compared objects' {{__eq__}} method > to test if they are equal. To check if an object is a singleton, such as > {{{}None{}}}, we recommend that you use the {{is}} identity comparison > operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15153) Use Python `is` instead of `==` to compare for None
[ https://issues.apache.org/jira/browse/KAFKA-15153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya resolved KAFKA-15153. -- Reviewer: Divij Vaidya Resolution: Fixed > Use Python `is` instead of `==` to compare for None > > > Key: KAFKA-15153 > URL: https://issues.apache.org/jira/browse/KAFKA-15153 > Project: Kafka > Issue Type: Improvement >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Minor > Labels: newbie > Fix For: 3.6.0 > > > *This is a good item to be picked by first time contributors to Kafka code > base* > At release_notes.py Line: 47 > The {{==}} and {{!=}} operators use the compared objects' {{__eq__}} method > to test if they are equal. To check if an object is a singleton, such as > {{{}None{}}}, we recommend that you use the {{is}} identity comparison > operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya merged pull request #13964: KAFKA-15153: Use Python 'is' instead of '==' to compare for None
divijvaidya merged PR #13964: URL: https://github.com/apache/kafka/pull/13964 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12969) Add cluster or broker level config for topic level tiered storage confgs.
[ https://issues.apache.org/jira/browse/KAFKA-12969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740642#comment-17740642 ] Divij Vaidya commented on KAFKA-12969: -- Please ensure that we also add validations here which throw ConfigException on failure, e.g. we cannot enable RemoteStorage for a topic if cluster level is not enabled etc. > Add cluster or broker level config for topic level tiered storage confgs. > -- > > Key: KAFKA-12969 > URL: https://issues.apache.org/jira/browse/KAFKA-12969 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1623807270 Thanks @yashmayya . I addressed the rest of the comments that you had. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15158) Add metrics for RemoteRequestsPerSec
[ https://issues.apache.org/jira/browse/KAFKA-15158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-15158: - Description: Add the following metrics for better observability into the RemoteLog related activities inside the broker. 1. RemoteWriteRequestsPerSec 2. RemoteDeleteRequestsPerSec 3. BuildRemoteLogAuxStateRequestsPerSec These metrics will be calculated at topic level (we can add them at brokerTopicStats) *RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager# copyLogSegmentsToRemote() *RemoteDeleteRequestsPerSec* will be marked on every call to RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced in [https://github.com/apache/kafka/pull/13561] *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to ReplicaFetcherTierStateMachine#buildRemoteLogAuxState() (Note that this requires a change in KIP-405 and hence, must be approved by KIP author [~satishd] ) was: Add the following metrics for better observability into the RemoteLog related activities inside the broker. 1. RemoteWriteRequestsPerSec 2. RemoteDeleteRequestsPerSec 3. BuildRemoteLogAuxStateRequestsPerSec These metrics will be calculated at topic level (we can add them at brokerTopicStats) *RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager# copyLogSegmentsToRemote() *RemoteDeleteRequestsPerSec* will be marked on every call to RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced in [https://github.com/apache/kafka/pull/13561] *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to ReplicaFetcherTierStateMachine#buildRemoteLogAuxState() > Add metrics for RemoteRequestsPerSec > > > Key: KAFKA-15158 > URL: https://issues.apache.org/jira/browse/KAFKA-15158 > Project: Kafka > Issue Type: Sub-task >Reporter: Divij Vaidya >Priority: Major > Fix For: 3.6.0 > > > Add the following metrics for better observability into the RemoteLog related > activities inside the broker. > 1. RemoteWriteRequestsPerSec > 2. RemoteDeleteRequestsPerSec > 3. BuildRemoteLogAuxStateRequestsPerSec > > These metrics will be calculated at topic level (we can add them at > brokerTopicStats) > *RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager# > copyLogSegmentsToRemote() > > *RemoteDeleteRequestsPerSec* will be marked on every call to > RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced > in [https://github.com/apache/kafka/pull/13561] > *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to > ReplicaFetcherTierStateMachine#buildRemoteLogAuxState() > > > (Note that this requires a change in KIP-405 and hence, must be approved by > KIP author [~satishd] ) > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1254544674 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -16,23 +16,36 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.powermock.reflect.Whitebox; Review Comment: Got it. I removed the usage of Whitebox. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1254543662 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } +@Test +public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + +KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); +KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + +extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + +ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( +() -> LoggingContext.forConnector("source-connector"), +workerStore, +connectorStore, +"offsets-topic", +mock(TopicAdmin.class)); + +OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + +offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); +assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); +Future flushFuture = offsetStorageWriter.doFlush((error, result) -> { +assertEquals(PRODUCE_EXCEPTION, error); +assertNull(result); +}); +assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); +} + +@Test +public void testFlushSuccessWhenWritesSucceedToBothPrimaryAndSecondaryStoresForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + +KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); +KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + +extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + +ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( +() -> LoggingContext.forConnector("source-connector"), +workerStore, +connectorStore, +"offsets-topic", +mock(TopicAdmin.class)); + +OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + +offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); +assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); +offsetStorageWriter.doFlush((error, result) -> { +assertNull(error); +assertNull(result); +}).get(1000L, TimeUnit.MILLISECONDS); +} + +@Test +public void testFlushSuccessWhenWriteToSecondaryStoreFailsForNonTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + +KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); +KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + +extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + +ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( +() -> LoggingContext.forConnector("source-connector"), +workerStore, +connectorStore, +"offsets-topic", +mock(TopicAdmin.class)); + +OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + +offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); +assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); +offsetStorageWriter.doFlush((error, result) -> { +assertNull(error); +assertNull(result); +}).get(1000L, TimeUnit.MILLISECONDS); +} + +@Test +public void testFlushSuccessWhenWritesToPrimaryAndSecondaryStoreSucceeds() throws InterruptedException, TimeoutException, ExecutionException { + +KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); +KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + +extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + +ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( +() -> LoggingContext.forConnector("source-connector"), +workerStore, +
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1254543055 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } +@Test +public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { Review Comment: Removed such instances from other tests as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records
vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1254542495 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ## @@ -279,10 +284,61 @@ public Future set(Map values, Callback callb throw new IllegalStateException("At least one non-null offset store must be provided"); } +boolean containsTombstones = values.containsValue(null); + +// If there are tombstone offsets, then the failure to write to secondary store will +// not be ignored. Also, for tombstone records, we first write to secondary store and +// then to primary stores. +if (secondaryStore != null && containsTombstones) { +AtomicReference secondaryStoreTombstoneWriteError = new AtomicReference<>(); +Future secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { +try (LoggingContext context = loggingContext()) { +if (secondaryWriteError != null) { +log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); +secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); +} else { +log.debug("Successfully flushed tombstone offsets to secondary backing store"); +} +} +}); Review Comment: I made the change. That has worked. Thanks! ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } +@Test +public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15149) Fix not sending UMR and LISR RPCs in dual-write mode when there are new partitions
[ https://issues.apache.org/jira/browse/KAFKA-15149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-15149: - Fix Version/s: 3.5.1 > Fix not sending UMR and LISR RPCs in dual-write mode when there are new > partitions > -- > > Key: KAFKA-15149 > URL: https://issues.apache.org/jira/browse/KAFKA-15149 > Project: Kafka > Issue Type: Bug >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > Fix For: 3.5.1 > > > In AK in {{KRaftMigrationZkWriter}} > [here|https://github.com/apache/kafka/blame/trunk/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java#L294] > we keep references to both the new and changed partitions maps from the > {{TopicsDelta}} instance. We mutate {{changedPartitions}} resulting in > possibly mutating the {{TopicsDelta}} instance that is provided as input to > the method. After making the ZK writes when we try and figure out the UMR and > LISR requests we need to make in > {{MigrationPropagator.sendRPCsToBrokersFromMetadataDelta}} the > {{TopicsDelta}} has lost the changed partitions metadata. As a result, we > might not send the expected UMR and LISR requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15158) Add metrics for RemoteRequestsPerSec
Divij Vaidya created KAFKA-15158: Summary: Add metrics for RemoteRequestsPerSec Key: KAFKA-15158 URL: https://issues.apache.org/jira/browse/KAFKA-15158 Project: Kafka Issue Type: Sub-task Reporter: Divij Vaidya Fix For: 3.6.0 Add the following metrics for better observability into the RemoteLog related activities inside the broker. 1. RemoteWriteRequestsPerSec 2. RemoteDeleteRequestsPerSec 3. BuildRemoteLogAuxStateRequestsPerSec These metrics will be calculated at topic level (we can add them at brokerTopicStats) *RemoteWriteRequestsPerSec* will be marked on every call to RemoteLogManager# copyLogSegmentsToRemote() *RemoteDeleteRequestsPerSec* will be marked on every call to RemoteLogManager#cleanupExpiredRemoteLogSegments(). This method is introduced in [https://github.com/apache/kafka/pull/13561] *BuildRemoteLogAuxStateRequestsPerSec* will be marked on every call to ReplicaFetcherTierStateMachine#buildRemoteLogAuxState() -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14509: Description: The goal of this task is to implement the ConsumerGroupDescribe API as described [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; and to implement the related changes in the admin client as described [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. On the server side, this mainly requires the following steps: # The request/response schemas must be defined (see ListGroupsRequest/Response.json for an example); # Request/response classes must be defined (see ListGroupsRequest/Response.java for an example); # The API must be defined in KafkaApis (see KafkaApis#handleDescribeGroupsRequest for an example); # The GroupCoordinator interface (java file) must be extended for the new operations. # The new operation must be implemented in GroupCoordinatorService (new coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in Scala) should just reject the request. We could probably do 1) and 2) in one pull request and the remaining ones in another. On the admin client side, this mainly requires the followings steps: * Define all the new java classes as defined in the KIP. * Add the new API to KafkaAdminClient class. was:The goal of this task is to implement the > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hudeqi commented on pull request #13958: MINOR:Fix modification exception introduced in KAFKA-15129;[1/N]
hudeqi commented on PR #13958: URL: https://github.com/apache/kafka/pull/13958#issuecomment-1623755996 > Could we please update the description/title to clearly state that this is fixing a regression introduced in [#13924 (comment)](https://github.com/apache/kafka/pull/13924#discussion_r1254371528)? I have changed the title, if there is something inappropriate, you can change it freely, thank you! @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on a diff in pull request #13958: MINOR:Refactor variable name for code specification
hudeqi commented on code in PR #13958: URL: https://github.com/apache/kafka/pull/13958#discussion_r1254494044 ## core/src/main/scala/kafka/log/LogCleanerManager.scala: ## @@ -549,7 +550,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], def removeMetrics(): Unit = { GaugeMetricNameNoTag.foreach(metricsGroup.removeMetric) -GaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => { +gaugeMetricNameWithTag.asScala.foreach(metricNameAndTags => { Review Comment: I am so sorry to introduce the problem. Thanks, @dajac @divijvaidya I have updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15157) Print startup time for RemoteIndexCache
Divij Vaidya created KAFKA-15157: Summary: Print startup time for RemoteIndexCache Key: KAFKA-15157 URL: https://issues.apache.org/jira/browse/KAFKA-15157 Project: Kafka Issue Type: Sub-task Reporter: Divij Vaidya When RemoteIndexCache starts up, it will try to re-build the in-memory cache using the files already present on the disk in the remote-index-cache directory. The process involves: 1. deleting existing files which are pending delete i.e. have a .delete suffix 2. read the cached index files, if present. 3. creating the indexes (this step will create a MMapped'buffer) 4. perform sanity check on the indexes 5. add to internal cache The steps 2-5 are bound by the maximum number of entries in the cache. But step 1 could be arbitrary large. To debug a slow cache startup, we should add a info statement that prints the time it took to initialize the cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14509: Description: The goal of this task is to implement the > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15154) Potential bug: We don't acquire lock when reading checkQuotas
[ https://issues.apache.org/jira/browse/KAFKA-15154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-15154: Assignee: Lan Ding > Potential bug: We don't acquire lock when reading checkQuotas > - > > Key: KAFKA-15154 > URL: https://issues.apache.org/jira/browse/KAFKA-15154 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Major > Labels: newbie > > At sensor.java line:254, we call `this.metrics.values()`. metrics is not a > thread safe map and that is why we acquire a lock whenever we want to > add/remove entries from it. For example, see add(), hasMetrics() method. > However, we don't acquire a lock when calling Sensor#checkQuotas(timeMs). > This could lead to a situation where this metrics map may be left in an > inconsistent state (since it is not thread safe for concurrent read/write > access). > The objective of this task is to validate what I said above is correct and if > yes, then fix the situation by enclosing this read in a lock. As a stretch > task, we should consider if we can replace the metrics data structure which > allows concurrent reads but exclusive writes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] DL1231 opened a new pull request, #13969: KAFKA-15154: Acquire lock when reading checkQuotas
DL1231 opened a new pull request, #13969: URL: https://github.com/apache/kafka/pull/13969 https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-15154 Acquire lock when reading checkQuotas. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13842: KAFKA-14995: Automate asf.yaml collaborators refresh
mimaison commented on PR #13842: URL: https://github.com/apache/kafka/pull/13842#issuecomment-1623715979 Thanks @stevenbooke for the updates. The changes seem fine to me. @vvcephei do you have further comments? Also let's update https://github.com/apache/kafka-site/pull/521 as we'll need to merge it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Riedel reassigned KAFKA-14509: -- Assignee: Max Riedel > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison merged pull request #13967: MINOR: Update connector status metric description to include 'stopped' as a potential value
mimaison merged PR #13967: URL: https://github.com/apache/kafka/pull/13967 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15154) Potential bug: We don't acquire lock when reading checkQuotas
[ https://issues.apache.org/jira/browse/KAFKA-15154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17740596#comment-17740596 ] Lan Ding commented on KAFKA-15154: -- I think you said is correct, "metrics.values()" will returns a Collection view of the values contained in the map, and the changes to the map are reflected in the collection. When reading and writing to the metrics map concurrently, it may lead to unpredictable results. > Potential bug: We don't acquire lock when reading checkQuotas > - > > Key: KAFKA-15154 > URL: https://issues.apache.org/jira/browse/KAFKA-15154 > Project: Kafka > Issue Type: Bug >Reporter: Divij Vaidya >Priority: Major > Labels: newbie > > At sensor.java line:254, we call `this.metrics.values()`. metrics is not a > thread safe map and that is why we acquire a lock whenever we want to > add/remove entries from it. For example, see add(), hasMetrics() method. > However, we don't acquire a lock when calling Sensor#checkQuotas(timeMs). > This could lead to a situation where this metrics map may be left in an > inconsistent state (since it is not thread safe for concurrent read/write > access). > The objective of this task is to validate what I said above is correct and if > yes, then fix the situation by enclosing this read in a lock. As a stretch > task, we should consider if we can replace the metrics data structure which > allows concurrent reads but exclusive writes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13960: KAFKA-15129;[6/N] Remove metrics in ControllerStats when broker shutdown
divijvaidya commented on code in PR #13960: URL: https://github.com/apache/kafka/pull/13960#discussion_r1254402700 ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -537,6 +538,7 @@ class KafkaController(val config: KafkaConfig, private def removeMetrics(): Unit = { KafkaController.MetricNames.foreach(metricsGroup.removeMetric) +controllerContext.stats.removeMetrics() Review Comment: we should be registering these metrics in initializeControllerContext and removing them in resetControllerContext. The responsibility of correctly resetting and initializing controllerContext is that of KafkaController. ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -2788,6 +2796,20 @@ private[controller] class ControllerStats { def removeMetric(name: String): Unit = { metricsGroup.removeMetric(name) } + + def removeMetrics(): Unit = { +MeterMetricNames.foreach(metricsGroup.removeMetric(_)) +timerMetricNames.asScala.foreach(metricsGroup.removeMetric(_)) Review Comment: remove entry from timerMetricNames as well else it keeps growing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #13954: MINOR: Move MockTimer to server-common
dajac merged PR #13954: URL: https://github.com/apache/kafka/pull/13954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #13958: MINOR:Refactor variable name for code specification
dajac commented on PR #13958: URL: https://github.com/apache/kafka/pull/13958#issuecomment-1623614568 Could we please update the description to clearly state that this is fixing a regression introduced in https://github.com/apache/kafka/pull/13924#discussion_r1254371528? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org